You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/02 03:33:18 UTC
[1/3] TAJO-216: Improve FilterPushDownRule and Implement physical
operators for outer join. (camelia_c via hyunsik)
Updated Branches:
refs/heads/master 406337d9d -> 3e6d684a9
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
new file mode 100644
index 0000000..a946934
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestLeftOuterNLJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestLeftOuterNLJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+ private Path testDir;
+
+ private TableDesc dep3;
+ private TableDesc job3;
+ private TableDesc emp3;
+ private TableDesc phone3;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ //----------------- dep3 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ Schema dep3Schema = new Schema();
+ dep3Schema.addColumn("dep_id", Type.INT4);
+ dep3Schema.addColumn("dep_name", Type.TEXT);
+ dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
+ StoreType.CSV);
+ Path dep3Path = new Path(testDir, "dep3.csv");
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+ appender1.init();
+ Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender1.addTuple(tuple);
+ }
+
+ appender1.flush();
+ appender1.close();
+ dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+ catalog.addTable(dep3);
+
+ //----------------- job3 ------------------------------
+ // job_id | job_title
+ // ----------------------
+ // 101 | job_101
+ // 102 | job_102
+ // 103 | job_103
+
+ Schema job3Schema = new Schema();
+ job3Schema.addColumn("job_id", Type.INT4);
+ job3Schema.addColumn("job_title", Type.TEXT);
+
+
+ TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+ StoreType.CSV);
+ Path job3Path = new Path(testDir, "job3.csv");
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+ appender2.init();
+ Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+ for (int i = 1; i < 4; i++) {
+ int x = 100 + i;
+ tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+ DatumFactory.createText("job_" + x) });
+ appender2.addTuple(tuple2);
+ }
+
+ appender2.flush();
+ appender2.close();
+ job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+ catalog.addTable(job3);
+
+
+
+ //---------------------emp3 --------------------
+ // emp_id | first_name | last_name | dep_id | salary | job_id
+ // ------------------------------------------------------------
+ // 11 | fn_11 | ln_11 | 1 | 123 | 101
+ // 13 | fn_13 | ln_13 | 3 | 369 | 103
+ // 15 | fn_15 | ln_15 | 5 | 615 | null
+ // 17 | fn_17 | ln_17 | 7 | 861 | null
+ // 19 | fn_19 | ln_19 | 9 | 1107 | null
+ // 21 | fn_21 | ln_21 | 1 | 123 | 101
+ // 23 | fn_23 | ln_23 | 3 | 369 | 103
+
+ Schema emp3Schema = new Schema();
+ emp3Schema.addColumn("emp_id", Type.INT4);
+ emp3Schema.addColumn("first_name", Type.TEXT);
+ emp3Schema.addColumn("last_name", Type.TEXT);
+ emp3Schema.addColumn("dep_id", Type.INT4);
+ emp3Schema.addColumn("salary", Type.FLOAT4);
+ emp3Schema.addColumn("job_id", Type.INT4);
+
+
+ TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+ Path emp3Path = new Path(testDir, "emp3.csv");
+ Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+ appender3.init();
+ Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+ for (int i = 1; i < 4; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+
+ int y = 20 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+ DatumFactory.createText("firstname_" + y),
+ DatumFactory.createText("lastname_" + y),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+ }
+
+ for (int i = 5; i < 10; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createNullDatum() });
+ appender3.addTuple(tuple3);
+ }
+
+ appender3.flush();
+ appender3.close();
+ emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+ catalog.addTable(emp3);
+
+ // ---------------------phone3 --------------------
+ // emp_id | phone_number
+ // -----------------------------------------------
+ // this table is empty, no rows
+
+ Schema phone3Schema = new Schema();
+ phone3Schema.addColumn("emp_id", Type.INT4);
+ phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+ TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+ StoreType.CSV);
+ Path phone3Path = new Path(testDir, "phone3.csv");
+ Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+ appender5.init();
+
+ appender5.flush();
+ appender5.close();
+ phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+ catalog.addTable(phone3);
+
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id", //0 no nulls
+ "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id", //1 nulls on the right operand
+ "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id", //2 nulls on the left side
+ "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id", //3 one operand is empty
+ "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id" //4 one operand is empty
+ };
+
+ @Test
+ public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException {
+ Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterNLJoinExec0");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr context = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+ LeftOuterHashJoinExec join = proj.getChild();
+ LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+ }
+
+ int count = 0;
+ exec.init();
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(12, count);
+ }
+
+
+ @Test
+ public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException {
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec1");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr context = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+ LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
+ LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+
+ }
+
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+
+ }
+ exec.close();
+ assertEquals(5, count);
+ }
+
+ @Test
+ public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException {
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec2");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr context = analyzer.parse(QUERIES[2]);
+ LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+ LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
+ LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+
+ }
+
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+
+
+ @Test
+ public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException {
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec3");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr context = analyzer.parse(QUERIES[3]);
+ LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+ LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
+ LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+
+ }
+
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+
+ @Test
+ public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException {
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec4");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr context = analyzer.parse(QUERIES[4]);
+ LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from LeftOuterHashJoinExec into LeftOuterNLJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof LeftOuterHashJoinExec) {
+ LeftOuterHashJoinExec join = (LeftOuterHashJoinExec) proj.getChild();
+ LeftOuterNLJoinExec aJoin = new LeftOuterNLJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+
+ }
+
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+
+ }
+ exec.close();
+ assertEquals(0, count);
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index ee5ab83..53df9f7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -179,7 +179,7 @@ public class TestMergeJoinExec {
assertTrue(i == tuple.getInt(0).asInt4());
assertTrue(i == tuple.getInt(1).asInt4());
assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
- assertTrue(10 + i == tuple.getInt(3).asInt4());
+ assertTrue((10 + i) == tuple.getInt(3).asInt4());
i += 2;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
new file mode 100644
index 0000000..d237f0c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+// this is not a physical operator in itself, but it uses the LeftOuterHashJoinExec with switched inputs order
+public class TestRightOuterHashJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestRightOuterHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+ private Path testDir;
+
+ private TableDesc dep3;
+ private TableDesc job3;
+ private TableDesc emp3;
+
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ //----------------- dep3 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ Schema dep3Schema = new Schema();
+ dep3Schema.addColumn("dep_id", Type.INT4);
+ dep3Schema.addColumn("dep_name", Type.TEXT);
+ dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
+ StoreType.CSV);
+ Path dep3Path = new Path(testDir, "dep3.csv");
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+ appender1.init();
+ Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender1.addTuple(tuple);
+ }
+
+ appender1.flush();
+ appender1.close();
+ dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+ catalog.addTable(dep3);
+
+ //----------------- job3 ------------------------------
+ // job_id | job_title
+ // ----------------------
+ // 101 | job_101
+ // 102 | job_102
+ // 103 | job_103
+
+ Schema job3Schema = new Schema();
+ job3Schema.addColumn("job_id", Type.INT4);
+ job3Schema.addColumn("job_title", Type.TEXT);
+
+
+ TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+ StoreType.CSV);
+ Path job3Path = new Path(testDir, "job3.csv");
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+ appender2.init();
+ Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+ for (int i = 1; i < 4; i++) {
+ int x = 100 + i;
+ tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+ DatumFactory.createText("job_" + x) });
+ appender2.addTuple(tuple2);
+ }
+
+ appender2.flush();
+ appender2.close();
+ job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+ catalog.addTable(job3);
+
+
+
+ //---------------------emp3 --------------------
+ // emp_id | first_name | last_name | dep_id | salary | job_id
+ // ------------------------------------------------------------
+ // 11 | fn_11 | ln_11 | 1 | 123 | 101
+ // 13 | fn_13 | ln_13 | 3 | 369 | 103
+ // 15 | fn_15 | ln_15 | 5 | 615 | null
+ // 17 | fn_17 | ln_17 | 7 | 861 | null
+ // 19 | fn_19 | ln_19 | 9 | 1107 | null
+ // 21 | fn_21 | ln_21 | 1 | 123 | 101
+ // 23 | fn_23 | ln_23 | 3 | 369 | 103
+
+ Schema emp3Schema = new Schema();
+ emp3Schema.addColumn("emp_id", Type.INT4);
+ emp3Schema.addColumn("first_name", Type.TEXT);
+ emp3Schema.addColumn("last_name", Type.TEXT);
+ emp3Schema.addColumn("dep_id", Type.INT4);
+ emp3Schema.addColumn("salary", Type.FLOAT4);
+ emp3Schema.addColumn("job_id", Type.INT4);
+
+
+ TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+ Path emp3Path = new Path(testDir, "emp3.csv");
+ Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+ appender3.init();
+ Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+ for (int i = 1; i < 4; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+
+ int y = 20 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+ DatumFactory.createText("firstname_" + y),
+ DatumFactory.createText("lastname_" + y),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+ }
+
+ for (int i = 5; i < 10; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createNullDatum() });
+ appender3.addTuple(tuple3);
+ }
+
+ appender3.flush();
+ appender3.close();
+ emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+ catalog.addTable(emp3);
+
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ "select dep3.dep_id, dep_name, emp_id, salary from emp3 right outer join dep3 on dep3.dep_id = emp3.dep_id", //0 no nulls
+ "select job3.job_id, job_title, emp_id, salary from emp3 right outer join job3 on job3.job_id=emp3.job_id", //1 nulls on the left operand
+ "select job3.job_id, job_title, emp_id, salary from job3 right outer join emp3 on job3.job_id=emp3.job_id" //2 nulls on the right side
+ };
+
+ @Test
+ public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException {
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec0");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(12, count);
+ }
+ }
+
+
+ @Test
+ public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException {
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec1");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr expr = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(5, count);
+ }
+ }
+
+ @Test
+ public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException {
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec2");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr expr = analyzer.parse(QUERIES[2]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+ }
+
+
+}
+ //--camelia
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
new file mode 100644
index 0000000..a37d409
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestRightOuterMergeJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestRightOuterMergeJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+ private Path testDir;
+ private static final int UNGENERATED_PID = -1;
+
+ private TableDesc dep3;
+ private TableDesc dep4;
+ private TableDesc job3;
+ private TableDesc emp3;
+ private TableDesc phone3;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ //----------------- dep3 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ Schema dep3Schema = new Schema();
+ dep3Schema.addColumn("dep_id", Type.INT4);
+ dep3Schema.addColumn("dep_name", Type.TEXT);
+ dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
+ StoreType.CSV);
+ Path dep3Path = new Path(testDir, "dep3.csv");
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+ appender1.init();
+ Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender1.addTuple(tuple);
+ }
+
+ appender1.flush();
+ appender1.close();
+ dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+ catalog.addTable(dep3);
+
+
+ //----------------- dep4 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ // 10 | dep_10 | 1010
+ Schema dep4Schema = new Schema();
+ dep4Schema.addColumn("dep_id", Type.INT4);
+ dep4Schema.addColumn("dep_name", Type.TEXT);
+ dep4Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep4Meta = CatalogUtil.newTableMeta(dep4Schema,
+ StoreType.CSV);
+ Path dep4Path = new Path(testDir, "dep4.csv");
+ Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Path);
+ appender4.init();
+ Tuple tuple4 = new VTuple(dep4Meta.getSchema().getColumnNum());
+ for (int i = 0; i < 11; i++) {
+ tuple4.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender4.addTuple(tuple4);
+ }
+
+ appender4.flush();
+ appender4.close();
+ dep4 = CatalogUtil.newTableDesc("dep4", dep4Meta, dep4Path);
+ catalog.addTable(dep4);
+
+
+
+ //----------------- job3 ------------------------------
+ // job_id | job_title
+ // ----------------------
+ // 101 | job_101
+ // 102 | job_102
+ // 103 | job_103
+
+ Schema job3Schema = new Schema();
+ job3Schema.addColumn("job_id", Type.INT4);
+ job3Schema.addColumn("job_title", Type.TEXT);
+
+
+ TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+ StoreType.CSV);
+ Path job3Path = new Path(testDir, "job3.csv");
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+ appender2.init();
+ Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+ for (int i = 1; i < 4; i++) {
+ int x = 100 + i;
+ tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+ DatumFactory.createText("job_" + x) });
+ appender2.addTuple(tuple2);
+ }
+
+ appender2.flush();
+ appender2.close();
+ job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+ catalog.addTable(job3);
+
+
+
+ //---------------------emp3 --------------------
+ // emp_id | first_name | last_name | dep_id | salary | job_id
+ // ------------------------------------------------------------
+ // 11 | fn_11 | ln_11 | 1 | 123 | 101
+ // 13 | fn_13 | ln_13 | 3 | 369 | 103
+ // 15 | fn_15 | ln_15 | 5 | 615 | null
+ // 17 | fn_17 | ln_17 | 7 | 861 | null
+ // 19 | fn_19 | ln_19 | 9 | 1107 | null
+ // 21 | fn_21 | ln_21 | 1 | 123 | 101
+ // 23 | fn_23 | ln_23 | 3 | 369 | 103
+
+ Schema emp3Schema = new Schema();
+ emp3Schema.addColumn("emp_id", Type.INT4);
+ emp3Schema.addColumn("first_name", Type.TEXT);
+ emp3Schema.addColumn("last_name", Type.TEXT);
+ emp3Schema.addColumn("dep_id", Type.INT4);
+ emp3Schema.addColumn("salary", Type.FLOAT4);
+ emp3Schema.addColumn("job_id", Type.INT4);
+
+
+ TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+ Path emp3Path = new Path(testDir, "emp3.csv");
+ Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+ appender3.init();
+ Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+ for (int i = 1; i < 4; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+
+ int y = 20 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+ DatumFactory.createText("firstname_" + y),
+ DatumFactory.createText("lastname_" + y),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+ }
+
+ for (int i = 5; i < 10; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createNullDatum() });
+ appender3.addTuple(tuple3);
+ }
+
+ appender3.flush();
+ appender3.close();
+ emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+ catalog.addTable(emp3);
+
+ //---------------------phone3 --------------------
+ // emp_id | phone_number
+ // -----------------------------------------------
+ // this table is empty, no rows
+
+ Schema phone3Schema = new Schema();
+ phone3Schema.addColumn("emp_id", Type.INT4);
+ phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+ TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+ StoreType.CSV);
+ Path phone3Path = new Path(testDir, "phone3.csv");
+ Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+ appender5.init();
+
+ appender5.flush();
+ appender5.close();
+ phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+ catalog.addTable(phone3);
+
+
+
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ // [0] no nulls
+ "select dep3.dep_id, dep_name, emp_id, salary from emp3 right outer join dep3 on dep3.dep_id = emp3.dep_id",
+ // [1] nulls on the left operand
+ "select job3.job_id, job_title, emp_id, salary from emp3 right outer join job3 on job3.job_id=emp3.job_id",
+ // [2] nulls on the right side
+ "select job3.job_id, job_title, emp_id, salary from job3 right outer join emp3 on job3.job_id=emp3.job_id",
+ // [3] no nulls, right continues after left
+ "select dep4.dep_id, dep_name, emp_id, salary from emp3 right outer join dep4 on dep4.dep_id = emp3.dep_id",
+ // [4] one operand is empty
+ "select emp3.emp_id, first_name, phone_number from emp3 right outer join phone3 on emp3.emp_id = phone3.emp_id",
+ // [5] one operand is empty
+ "select phone_number, emp3.emp_id, first_name from phone3 right outer join emp3 on emp3.emp_id = phone3.emp_id"
+ };
+
+ @Test
+ public final void testRightOuterMergeJoin0() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin0");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(12, count);
+ }
+
+
+ @Test
+ public final void testRightOuter_MergeJoin1() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin1");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(5, count);
+ }
+
+ @Test
+ public final void testRightOuterMergeJoin2() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[2]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin2");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(7, count);
+ }
+
+
+ @Test
+ public final void testRightOuter_MergeJoin3() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[3]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin3");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(13, count);
+ }
+
+ @Test
+ public final void testRightOuter_MergeJoin4() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[4]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin4");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(0, count);
+ }
+
+ @Test
+ public final void testRightOuterMergeJoin5() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[5]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin5");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+}
[2/3] TAJO-216: Improve FilterPushDownRule and Implement physical
operators for outer join. (camelia_c via hyunsik)
Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
new file mode 100644
index 0000000..c8df5c5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ private JoinNode joinNode;
+ private EvalNode joinQual;
+ private EvalContext qualCtx;
+
+ // temporal tuples and states for nested loop join
+ private FrameTuple frameTuple;
+ private Tuple leftTuple = null;
+ private Tuple rightTuple = null;
+ private Tuple outTuple = null;
+ private Tuple nextLeft = null;
+
+ private final List<Tuple> leftTupleSlots;
+ private final List<Tuple> innerTupleSlots;
+
+ private JoinTupleComparator joinComparator = null;
+ private TupleComparator[] tupleComparator = null;
+
+ private final static int INITIAL_TUPLE_SLOT = 10000;
+
+ private boolean end = false;
+
+ // projection
+ private final Projector projector;
+ private final EvalContext [] evalContexts;
+
+ private int rightNumCols;
+ private int leftNumCols;
+ private int posRightTupleSlots = -1;
+ private int posLeftTupleSlots = -1;
+ private boolean endInPopulationStage = false;
+ private boolean initRightDone = false;
+
+ public RightOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+ PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner);
+ Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+ "but there is no join condition");
+ this.joinNode = plan;
+ this.joinQual = plan.getJoinQual();
+ this.qualCtx = this.joinQual.newContext();
+
+ this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ SortSpec[][] sortSpecs = new SortSpec[2][];
+ sortSpecs[0] = outerSortKey;
+ sortSpecs[1] = innerSortKey;
+
+ this.joinComparator = new JoinTupleComparator(outer.getSchema(), inner.getSchema(), sortSpecs);
+ this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+ plan.getJoinQual(), outer.getSchema(), inner.getSchema());
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.evalContexts = projector.renew();
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.getColumnNum());
+
+ leftNumCols = outer.getSchema().getColumnNum();
+ }
+
+ public JoinNode getPlan() {
+ return this.joinNode;
+ }
+
+ /**
+ * creates a tuple of a given size filled with NULL values in all fields
+ */
+ private Tuple createNullPaddedTuple(int columnNum){
+ VTuple tuple = new VTuple(columnNum);
+ for (int i = 0; i < columnNum; i++) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ }
+ return tuple;
+ }
+
+ /**
+ *
+ * Right outer merge join consists of four stages
+ * <ul>
+ * <li>initialization stage: </li>
+ * <li>finalizing stage: </li>
+ * </ul>
+ *
+ * @return
+ * @throws IOException
+ */
+ public Tuple next() throws IOException {
+ Tuple previous;
+
+ for (;;) {
+ boolean newRound = false;
+ if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+ newRound = true;
+ }
+ if ((posRightTupleSlots == innerTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+ newRound = true;
+ }
+
+ if (newRound) {
+
+ //////////////////////////////////////////////////////////////////////
+ // BEGIN FINALIZING STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // The finalizing stage, where remaining tuples on the only right are transformed into left-padded results
+ if (end) {
+ if (initRightDone == false) {
+ // maybe the left operand was empty => the right one didn't have the chance to initialize
+ rightTuple = rightChild.next();
+ initRightDone = true;
+ }
+
+ if(rightTuple == null) {
+ return null;
+ } else {
+ // output a tuple with the nulls padded leftTuple
+ Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, rightTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+
+ // we simulate we found a match, which is exactly the null padded one
+ rightTuple = rightChild.next();
+
+ return outTuple;
+ }
+ }
+ //////////////////////////////////////////////////////////////////////
+ // END FINALIZING STAGE
+ //////////////////////////////////////////////////////////////////////
+
+
+ //////////////////////////////////////////////////////////////////////
+ // BEGIN INITIALIZATION STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // This stage reads the first tuple on each side
+ if (leftTuple == null) {
+ leftTuple = leftChild.next();
+
+ if (leftTuple == null) {
+ end = true;
+ continue;
+ }
+ }
+
+ if(rightTuple == null){
+ rightTuple = rightChild.next();
+
+ if(rightTuple != null){
+ initRightDone = true;
+ }
+ else {
+ initRightDone = true;
+ end = true;
+ continue;
+ }
+ }
+ //////////////////////////////////////////////////////////////////////
+ // END INITIALIZATION STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // reset tuple slots for a new round
+ leftTupleSlots.clear();
+ innerTupleSlots.clear();
+ posRightTupleSlots = -1;
+ posLeftTupleSlots = -1;
+
+
+ //////////////////////////////////////////////////////////////////////
+ // BEGIN MOVE FORWARDING STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // This stage moves forward a tuple cursor on each side relation until a match
+ // is found
+ int cmp;
+ while ((end != true) && ((cmp = joinComparator.compare(leftTuple, rightTuple)) != 0)) {
+
+ // if right is lower than the left tuple, it means that all right tuples s.t. cmp <= 0 are
+ // matched tuples.
+ if (cmp > 0) {
+ // before getting a new tuple from the right, a left null padded tuple should be built
+ // output a tuple with the nulls padded left tuple
+ Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, rightTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+
+ // we simulate we found a match, which is exactly the null padded one
+ // BEFORE RETURN, MOVE FORWARD
+ rightTuple = rightChild.next();
+ if(rightTuple == null) {
+ end = true;
+ }
+ return outTuple;
+
+ } else if (cmp < 0) {
+ // If the left tuple is lower than the right tuple, just move forward the left tuple cursor.
+ leftTuple = leftChild.next();
+ if(leftTuple == null) {
+ end = true;
+ // in original algorithm we had return null ,
+ // but now we need to continue the end processing phase for remaining unprocessed right tuples
+ }
+ } // if (cmp<0)
+ } // while
+ //////////////////////////////////////////////////////////////////////
+ // END MOVE FORWARDING STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // once a match is found, retain all tuples with this key in tuple slots on each side
+ if(!end) {
+ endInPopulationStage = false;
+
+ boolean endOuter = false;
+ boolean endInner = false;
+
+ previous = new VTuple(leftTuple);
+ do {
+ leftTupleSlots.add(new VTuple(leftTuple));
+ leftTuple = leftChild.next();
+ if( leftTuple == null) {
+ endOuter = true;
+ }
+ } while ((endOuter != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+ posLeftTupleSlots = 0;
+
+ previous = new VTuple(rightTuple);
+
+ do {
+ innerTupleSlots.add(new VTuple(rightTuple));
+ rightTuple = rightChild.next();
+ if(rightTuple == null) {
+ endInner = true;
+ }
+
+ } while ((endInner != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+ posRightTupleSlots = 0;
+
+ if ((endOuter == true) || (endInner == true)) {
+ end = true;
+ endInPopulationStage = true;
+ }
+ } // if end false
+ } // if newRound
+
+
+ // Now output result matching tuples from the slots
+ // if either we haven't reached end on neither side, or we did reach end on one(or both) sides
+ // but that happened in the slots population step (i.e. refers to next round)
+
+ if ((end == false) || ((end == true) && (endInPopulationStage == true))){
+
+ if(posLeftTupleSlots == 0){
+ nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+ posLeftTupleSlots = posLeftTupleSlots + 1;
+ }
+
+
+ if(posRightTupleSlots <= (innerTupleSlots.size() -1)) {
+
+ Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+ posRightTupleSlots = posRightTupleSlots + 1;
+
+ frameTuple.set(nextLeft, aTuple);
+ joinQual.eval(qualCtx, inSchema, frameTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ return outTuple;
+
+ } else {
+ // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+ if(posLeftTupleSlots <= (leftTupleSlots.size() - 1)) {
+ //rewind the right slots position
+ posRightTupleSlots = 0;
+ Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+ posRightTupleSlots = posRightTupleSlots + 1;
+ nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+ posLeftTupleSlots = posLeftTupleSlots + 1;
+
+ frameTuple.set(nextLeft, aTuple);
+ joinQual.eval(qualCtx, inSchema, frameTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ return outTuple;
+ }
+ }
+ } // the second if end false
+ } // for
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ leftTupleSlots.clear();
+ innerTupleSlots.clear();
+ posRightTupleSlots = -1;
+ posLeftTupleSlots = -1;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index a33cbd7..22bcd06 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -29,6 +29,13 @@ import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.query.exception.InvalidQueryException;
+import org.apache.tajo.engine.eval.EvalType;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.catalog.Column;
+import com.google.common.collect.Sets;
+import java.util.Set;
+import java.util.Iterator;
+
import java.util.List;
import java.util.Stack;
@@ -85,11 +92,112 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<List<EvalNode>>
return selNode;
}
+ private boolean isOuterJoin(JoinType joinType) {
+ return joinType == JoinType.LEFT_OUTER || joinType == JoinType.RIGHT_OUTER || joinType==JoinType.FULL_OUTER;
+ }
+
public LogicalNode visitJoin(LogicalPlan plan, JoinNode joinNode, Stack<LogicalNode> stack, List<EvalNode> cnf)
throws PlanningException {
LogicalNode left = joinNode.getRightChild();
LogicalNode right = joinNode.getLeftChild();
+ // here we should stop selection pushdown on the null supplying side(s) of an outer join
+ // get the two operands of the join operation as well as the join type
+ JoinType joinType = joinNode.getJoinType();
+ EvalNode joinQual = joinNode.getJoinQual();
+ if (joinQual != null && isOuterJoin(joinType)) {
+
+ // if both are fields
+ if (joinQual.getLeftExpr().getType() == EvalType.FIELD && joinQual.getRightExpr().getType() == EvalType.FIELD) {
+
+ String leftTableName = ((FieldEval) joinQual.getLeftExpr()).getTableId();
+ String rightTableName = ((FieldEval) joinQual.getRightExpr()).getTableId();
+ List<String> nullSuppliers = Lists.newArrayList();
+ String [] leftLineage = PlannerUtil.getLineage(joinNode.getLeftChild());
+ String [] rightLineage = PlannerUtil.getLineage(joinNode.getRightChild());
+ Set<String> leftTableSet = Sets.newHashSet(leftLineage);
+ Set<String> rightTableSet = Sets.newHashSet(rightLineage);
+
+ // some verification
+ if (joinType == JoinType.FULL_OUTER) {
+ nullSuppliers.add(leftTableName);
+ nullSuppliers.add(rightTableName);
+
+ // verify that these null suppliers are indeed in the left and right sets
+ if (!rightTableSet.contains(nullSuppliers.get(0)) && !leftTableSet.contains(nullSuppliers.get(0))) {
+ throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+ }
+ if (!rightTableSet.contains(nullSuppliers.get(1)) && !leftTableSet.contains(nullSuppliers.get(1))) {
+ throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+ }
+
+ } else if (joinType == JoinType.LEFT_OUTER) {
+ nullSuppliers.add(((ScanNode)joinNode.getRightChild()).getTableName());
+ //verify that this null supplier is indeed in the right sub-tree
+ if (!rightTableSet.contains(nullSuppliers.get(0))) {
+ throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+ }
+ } else if (joinType == JoinType.RIGHT_OUTER) {
+ if (((ScanNode)joinNode.getRightChild()).getTableName().equals(rightTableName)) {
+ nullSuppliers.add(leftTableName);
+ } else {
+ nullSuppliers.add(rightTableName);
+ }
+
+ // verify that this null supplier is indeed in the left sub-tree
+ if (!leftTableSet.contains(nullSuppliers.get(0))) {
+ throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+ }
+ }
+
+ // retain in this outer join node's JoinQual those selection predicates
+ // related to the outer join's null supplier(s)
+ List<EvalNode> matched2 = Lists.newArrayList();
+ for (EvalNode eval : cnf) {
+
+ Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(eval);
+ Set<String> tableNames = Sets.newHashSet();
+ // getting distinct table references
+ for (Column col : columnRefs) {
+ if (!tableNames.contains(col.getQualifier())) {
+ tableNames.add(col.getQualifier());
+ }
+ }
+
+ //if the predicate involves any of the null suppliers
+ boolean shouldKeep=false;
+ Iterator<String> it2 = nullSuppliers.iterator();
+ while(it2.hasNext()){
+ if(tableNames.contains(it2.next()) == true) {
+ shouldKeep = true;
+ }
+ }
+
+ if(shouldKeep == true) {
+ matched2.add(eval);
+ }
+
+ }
+
+ //merge the retained predicates and establish them in the current outer join node. Then remove them from the cnf
+ EvalNode qual2 = null;
+ if (matched2.size() > 1) {
+ // merged into one eval tree
+ qual2 = EvalTreeUtil.transformCNF2Singleton(
+ matched2.toArray(new EvalNode [matched2.size()]));
+ } else if (matched2.size() == 1) {
+ // if the number of matched expr is one
+ qual2 = matched2.get(0);
+ }
+
+ if (qual2 != null) {
+ EvalNode conjQual2 = EvalTreeUtil.transformCNF2Singleton(joinNode.getJoinQual(), qual2);
+ joinNode.setJoinQual(conjQual2);
+ cnf.removeAll(matched2);
+ } // for the remaining cnf, push it as usual
+ }
+ }
+
visitChild(plan, left, stack, cnf);
visitChild(plan, right, stack, cnf);
@@ -112,8 +220,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<List<EvalNode>>
if (qual != null) {
if (joinNode.hasJoinQual()) {
- EvalNode conjQual = EvalTreeUtil.
- transformCNF2Singleton(joinNode.getJoinQual(), qual);
+ EvalNode conjQual = EvalTreeUtil.transformCNF2Singleton(joinNode.getJoinQual(), qual);
joinNode.setJoinQual(conjQual);
} else {
joinNode.setJoinQual(qual);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 3e9d958..53a8449 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -385,4 +385,20 @@ public class TupleUtil {
}
return new TupleRange(target, startTuple, endTuple);
}
+
+ /**
+ * It creates a tuple of a given size filled with NULL values in all fields
+ * It is usually used in outer join algorithms.
+ *
+ * @param size The number of columns of a creating tuple
+ * @return The created tuple filled with NULL values
+ */
+ public static Tuple createNullPaddedTuple(int size){
+ VTuple aTuple = new VTuple(size);
+ int i;
+ for(i = 0; i < size; i++){
+ aTuple.put(i, DatumFactory.createNullDatum());
+ }
+ return aTuple;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
new file mode 100644
index 0000000..6fb7f61
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestFullOuterHashJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestFullOuterHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+ private Path testDir;
+
+ private TableDesc dep3;
+ private TableDesc job3;
+ private TableDesc emp3;
+ private TableDesc phone3;
+
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ //----------------- dep3 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ Schema dep3Schema = new Schema();
+ dep3Schema.addColumn("dep_id", Type.INT4);
+ dep3Schema.addColumn("dep_name", Type.TEXT);
+ dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema, StoreType.CSV);
+ Path dep3Path = new Path(testDir, "dep3.csv");
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+ appender1.init();
+ Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender1.addTuple(tuple);
+ }
+
+ appender1.flush();
+ appender1.close();
+ dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+ catalog.addTable(dep3);
+
+ //----------------- job3 ------------------------------
+ // job_id | job_title
+ // ----------------------
+ // 101 | job_101
+ // 102 | job_102
+ // 103 | job_103
+
+ Schema job3Schema = new Schema();
+ job3Schema.addColumn("job_id", Type.INT4);
+ job3Schema.addColumn("job_title", Type.TEXT);
+
+
+ TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+ StoreType.CSV);
+ Path job3Path = new Path(testDir, "job3.csv");
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+ appender2.init();
+ Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+ for (int i = 1; i < 4; i++) {
+ int x = 100 + i;
+ tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+ DatumFactory.createText("job_" + x) });
+ appender2.addTuple(tuple2);
+ }
+
+ appender2.flush();
+ appender2.close();
+ job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+ catalog.addTable(job3);
+
+
+
+ //---------------------emp3 --------------------
+ // emp_id | first_name | last_name | dep_id | salary | job_id
+ // ------------------------------------------------------------
+ // 11 | fn_11 | ln_11 | 1 | 123 | 101
+ // 13 | fn_13 | ln_13 | 3 | 369 | 103
+ // 15 | fn_15 | ln_15 | 5 | 615 | null
+ // 17 | fn_17 | ln_17 | 7 | 861 | null
+ // 19 | fn_19 | ln_19 | 9 | 1107 | null
+ // 21 | fn_21 | ln_21 | 1 | 123 | 101
+ // 23 | fn_23 | ln_23 | 3 | 369 | 103
+
+ Schema emp3Schema = new Schema();
+ emp3Schema.addColumn("emp_id", Type.INT4);
+ emp3Schema.addColumn("first_name", Type.TEXT);
+ emp3Schema.addColumn("last_name", Type.TEXT);
+ emp3Schema.addColumn("dep_id", Type.INT4);
+ emp3Schema.addColumn("salary", Type.FLOAT4);
+ emp3Schema.addColumn("job_id", Type.INT4);
+
+
+ TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+ Path emp3Path = new Path(testDir, "emp3.csv");
+ Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+ appender3.init();
+ Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+ for (int i = 1; i < 4; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+
+ int y = 20 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+ DatumFactory.createText("firstname_" + y),
+ DatumFactory.createText("lastname_" + y),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+ }
+
+ for (int i = 5; i < 10; i += 2) {
+ int x= 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createNullDatum() });
+ appender3.addTuple(tuple3);
+ }
+
+ appender3.flush();
+ appender3.close();
+ emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+ catalog.addTable(emp3);
+
+ //---------------------phone3 --------------------
+ // emp_id | phone_number
+ // -----------------------------------------------
+ // this table is empty, no rows
+
+ Schema phone3Schema = new Schema();
+ phone3Schema.addColumn("emp_id", Type.INT4);
+ phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+ TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+ StoreType.CSV);
+ Path phone3Path = new Path(testDir, "phone3.csv");
+ Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+ appender5.init();
+
+ appender5.flush();
+ appender5.close();
+ phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+ catalog.addTable(phone3);
+
+
+
+
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ // [0] no nulls
+ "select dep3.dep_id, dep_name, emp_id, salary from dep3 full outer join emp3 on dep3.dep_id = emp3.dep_id",
+ // [1] nulls on the right operand
+ "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id",
+ // [2] nulls on the left side
+ "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id",
+ // [3] one operand is empty
+ "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id"
+ };
+
+ @Test
+ public final void testFullOuterHashJoinExec0() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec0");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(12, count);
+
+ }
+
+
+ @Test
+ public final void testFullOuterHashJoinExec1() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuter_HashJoinExec1");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(8, count);
+
+ }
+
+ @Test
+ public final void testFullOuterHashJoinExec2() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[2]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec2");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(8, count);
+
+ }
+
+
+ @Test
+ public final void testFullOuterHashJoinExec3() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[3]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec3");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(), merged,
+ workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof FullOuterHashJoinExec);
+
+ int count = 0;
+ exec.init();
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(7, count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
new file mode 100644
index 0000000..5fde9b8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestFullOuterMergeJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestFullOuterMergeJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+ private Path testDir;
+ private static final int UNGENERATED_PID = -1;
+
+ private TableDesc dep3;
+ private TableDesc dep4;
+ private TableDesc job3;
+ private TableDesc emp3;
+ private TableDesc phone3;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ //----------------- dep3 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ Schema dep3Schema = new Schema();
+ dep3Schema.addColumn("dep_id", Type.INT4);
+ dep3Schema.addColumn("dep_name", Type.TEXT);
+ dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema, StoreType.CSV);
+ Path dep3Path = new Path(testDir, "dep3.csv");
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+ appender1.init();
+ Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender1.addTuple(tuple);
+ }
+
+ appender1.flush();
+ appender1.close();
+ dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+ catalog.addTable(dep3);
+
+
+ //----------------- dep4 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ // 10 | dep_10 | 1010
+ Schema dep4Schema = new Schema();
+ dep4Schema.addColumn("dep_id", Type.INT4);
+ dep4Schema.addColumn("dep_name", Type.TEXT);
+ dep4Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep4Meta = CatalogUtil.newTableMeta(dep4Schema,
+ StoreType.CSV);
+ Path dep4Path = new Path(testDir, "dep4.csv");
+ Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Path);
+ appender4.init();
+ Tuple tuple4 = new VTuple(dep4Meta.getSchema().getColumnNum());
+ for (int i = 0; i < 11; i++) {
+ tuple4.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender4.addTuple(tuple4);
+ }
+
+ appender4.flush();
+ appender4.close();
+ dep4 = CatalogUtil.newTableDesc("dep4", dep4Meta, dep4Path);
+ catalog.addTable(dep4);
+
+
+
+ //----------------- job3 ------------------------------
+ // job_id | job_title
+ // ----------------------
+ // 101 | job_101
+ // 102 | job_102
+ // 103 | job_103
+
+ Schema job3Schema = new Schema();
+ job3Schema.addColumn("job_id", Type.INT4);
+ job3Schema.addColumn("job_title", Type.TEXT);
+
+
+ TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+ StoreType.CSV);
+ Path job3Path = new Path(testDir, "job3.csv");
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+ appender2.init();
+ Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+ for (int i = 1; i < 4; i++) {
+ int x = 100 + i;
+ tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+ DatumFactory.createText("job_" + x) });
+ appender2.addTuple(tuple2);
+ }
+
+ appender2.flush();
+ appender2.close();
+ job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+ catalog.addTable(job3);
+
+
+
+ //---------------------emp3 --------------------
+ // emp_id | first_name | last_name | dep_id | salary | job_id
+ // ------------------------------------------------------------
+ // 11 | fn_11 | ln_11 | 1 | 123 | 101
+ // 13 | fn_13 | ln_13 | 3 | 369 | 103
+ // 15 | fn_15 | ln_15 | 5 | 615 | null
+ // 17 | fn_17 | ln_17 | 7 | 861 | null
+ // 19 | fn_19 | ln_19 | 9 | 1107 | null
+ // 21 | fn_21 | ln_21 | 1 | 123 | 101
+ // 23 | fn_23 | ln_23 | 3 | 369 | 103
+
+ Schema emp3Schema = new Schema();
+ emp3Schema.addColumn("emp_id", Type.INT4);
+ emp3Schema.addColumn("first_name", Type.TEXT);
+ emp3Schema.addColumn("last_name", Type.TEXT);
+ emp3Schema.addColumn("dep_id", Type.INT4);
+ emp3Schema.addColumn("salary", Type.FLOAT4);
+ emp3Schema.addColumn("job_id", Type.INT4);
+
+
+ TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+ Path emp3Path = new Path(testDir, "emp3.csv");
+ Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+ appender3.init();
+ Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+ for (int i = 1; i < 4; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+
+ int y = 20 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+ DatumFactory.createText("firstname_" + y),
+ DatumFactory.createText("lastname_" + y),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+ }
+
+ for (int i = 5; i < 10; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createNullDatum() });
+ appender3.addTuple(tuple3);
+ }
+
+ appender3.flush();
+ appender3.close();
+ emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+ catalog.addTable(emp3);
+
+ //---------------------phone3 --------------------
+ // emp_id | phone_number
+ // -----------------------------------------------
+ // this table is empty, no rows
+
+ Schema phone3Schema = new Schema();
+ phone3Schema.addColumn("emp_id", Type.INT4);
+ phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+ TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+ StoreType.CSV);
+ Path phone3Path = new Path(testDir, "phone3.csv");
+ Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+ appender5.init();
+ appender5.flush();
+ appender5.close();
+ phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+ catalog.addTable(phone3);
+
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ // [0] no nulls
+ "select dep3.dep_id, dep_name, emp_id, salary from emp3 full outer join dep3 on dep3.dep_id = emp3.dep_id",
+ // [1] nulls on the left operand
+ "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id",
+ // [2] nulls on the right side
+ "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id",
+ // [3] no nulls, right continues after left
+ "select dep4.dep_id, dep_name, emp_id, salary from emp3 full outer join dep4 on dep4.dep_id = emp3.dep_id",
+ // [4] one operand is empty
+ "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id",
+ // [5] one operand is empty
+ "select emp3.emp_id, first_name, phone_number from phone3 full outer join emp3 on emp3.emp_id = phone3.emp_id",
+ };
+
+ @Test
+ public final void testFullOuterMergeJoin0() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin0");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(12, count);
+ }
+
+
+ @Test
+ public final void testFullOuterMergeJoin1() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin1");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(8, count);
+ }
+
+ @Test
+ public final void testFullOuterMergeJoin2() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[2]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin2");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(8, count);
+ }
+
+ @Test
+ public final void testFullOuterMergeJoin3() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[3]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin3");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+
+ // if it chose the hash join WITH REVERSED ORDER, convert to merge join exec
+ assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(13, count);
+ }
+
+
+ @Test
+ public final void testFullOuterMergeJoin4() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[4]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin4");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(7, count);
+ }
+
+
+ @Test
+ public final void testFullOuterMergeJoin5() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[5]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin5");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof FullOuterMergeJoinExec);
+
+ int count = 0;
+ exec.init();
+
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(7, count);
+ }
+
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
new file mode 100644
index 0000000..5486e8d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.LocalTajoTestingUtility;
+
+import java.io.IOException;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestLeftOuterHashJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestLeftOuterHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+ private Path testDir;
+
+ private TableDesc dep3;
+ private TableDesc job3;
+ private TableDesc emp3;
+ private TableDesc phone3;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ //----------------- dep3 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ Schema dep3Schema = new Schema();
+ dep3Schema.addColumn("dep_id", Type.INT4);
+ dep3Schema.addColumn("dep_name", Type.TEXT);
+ dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema,
+ StoreType.CSV);
+ Path dep3Path = new Path(testDir, "dep3.csv");
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path);
+ appender1.init();
+ Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender1.addTuple(tuple);
+ }
+
+ appender1.flush();
+ appender1.close();
+ dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path);
+ catalog.addTable(dep3);
+
+ //----------------- job3 ------------------------------
+ // job_id | job_title
+ // ----------------------
+ // 101 | job_101
+ // 102 | job_102
+ // 103 | job_103
+
+ Schema job3Schema = new Schema();
+ job3Schema.addColumn("job_id", Type.INT4);
+ job3Schema.addColumn("job_title", Type.TEXT);
+
+
+ TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema,
+ StoreType.CSV);
+ Path job3Path = new Path(testDir, "job3.csv");
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path);
+ appender2.init();
+ Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum());
+ for (int i = 1; i < 4; i++) {
+ int x = 100 + i;
+ tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+ DatumFactory.createText("job_" + x) });
+ appender2.addTuple(tuple2);
+ }
+
+ appender2.flush();
+ appender2.close();
+ job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path);
+ catalog.addTable(job3);
+
+
+
+ //---------------------emp3 --------------------
+ // emp_id | first_name | last_name | dep_id | salary | job_id
+ // ------------------------------------------------------------
+ // 11 | fn_11 | ln_11 | 1 | 123 | 101
+ // 13 | fn_13 | ln_13 | 3 | 369 | 103
+ // 15 | fn_15 | ln_15 | 5 | 615 | null
+ // 17 | fn_17 | ln_17 | 7 | 861 | null
+ // 19 | fn_19 | ln_19 | 9 | 1107 | null
+ // 21 | fn_21 | ln_21 | 1 | 123 | 101
+ // 23 | fn_23 | ln_23 | 3 | 369 | 103
+
+ Schema emp3Schema = new Schema();
+ emp3Schema.addColumn("emp_id", Type.INT4);
+ emp3Schema.addColumn("first_name", Type.TEXT);
+ emp3Schema.addColumn("last_name", Type.TEXT);
+ emp3Schema.addColumn("dep_id", Type.INT4);
+ emp3Schema.addColumn("salary", Type.FLOAT4);
+ emp3Schema.addColumn("job_id", Type.INT4);
+
+
+ TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV);
+ Path emp3Path = new Path(testDir, "emp3.csv");
+ Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path);
+ appender3.init();
+ Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum());
+
+ for (int i = 1; i < 4; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+
+ int y = 20 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+ DatumFactory.createText("firstname_" + y),
+ DatumFactory.createText("lastname_" + y),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+ }
+
+ for (int i = 5; i < 10; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createNullDatum() });
+ appender3.addTuple(tuple3);
+ }
+
+ appender3.flush();
+ appender3.close();
+ emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path);
+ catalog.addTable(emp3);
+
+ //---------------------phone3 --------------------
+ // emp_id | phone_number
+ // -----------------------------------------------
+ // this table is empty, no rows
+
+ Schema phone3Schema = new Schema();
+ phone3Schema.addColumn("emp_id", Type.INT4);
+ phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+ TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema,
+ StoreType.CSV);
+ Path phone3Path = new Path(testDir, "phone3.csv");
+ Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path);
+ appender5.init();
+
+ appender5.flush();
+ appender5.close();
+ phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path);
+ catalog.addTable(phone3);
+
+
+
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ // [0] no nulls
+ "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id",
+ // [1] nulls on the right operand
+ "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id",
+ // [2] nulls on the left side
+ "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id",
+ // [3] one operand is empty
+ "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id",
+ // [4] one operand is empty
+ "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id"
+ };
+
+ @Test
+ public final void testLeftOuterHashJoinExec0() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterHashJoinExec0");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof LeftOuterHashJoinExec);
+
+ int count = 0;
+ exec.init();
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(12, count);
+ }
+
+
+ @Test
+ public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningException {
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec1");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr expr = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(5, count);
+ }
+ }
+
+ @Test
+ public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningException {
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec2");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr expr = analyzer.parse(QUERIES[2]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+ }
+
+
+ @Test
+ public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningException {
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec3");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr expr = analyzer.parse(QUERIES[3]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+ }
+
+
+ @Test
+ public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningException {
+
+ Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec4");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ Expr expr = analyzer.parse(QUERIES[4]);
+ LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof LeftOuterNLJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(0, count);
+ }
+ }
+
+
+
+}
+ //--camelia
[3/3] git commit: TAJO-216: Improve FilterPushDownRule and Implement
physical operators for outer join. (camelia_c via hyunsik)
Posted by hy...@apache.org.
TAJO-216: Improve FilterPushDownRule and Implement physical operators for outer join. (camelia_c via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3e6d684a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3e6d684a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3e6d684a
Branch: refs/heads/master
Commit: 3e6d684a9f1c7592c99d359c376956fcc9e917ba
Parents: 406337d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Oct 2 10:29:03 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Oct 2 10:32:23 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../java/org/apache/tajo/datum/CharDatum.java | 23 +-
.../java/org/apache/tajo/datum/Float4Datum.java | 92 ++--
.../java/org/apache/tajo/datum/Float8Datum.java | 64 ++-
.../java/org/apache/tajo/datum/Int2Datum.java | 65 ++-
.../java/org/apache/tajo/datum/Int4Datum.java | 58 ++-
.../java/org/apache/tajo/datum/Int8Datum.java | 63 ++-
.../java/org/apache/tajo/datum/NullDatum.java | 2 +-
.../java/org/apache/tajo/datum/TextDatum.java | 27 +-
.../engine/planner/PhysicalPlannerImpl.java | 178 ++++++-
.../planner/physical/FullOuterHashJoinExec.java | 252 +++++++++
.../physical/FullOuterMergeJoinExec.java | 334 ++++++++++++
.../planner/physical/LeftOuterHashJoinExec.java | 214 ++++++++
.../planner/physical/LeftOuterNLJoinExec.java | 129 +++++
.../physical/RightOuterMergeJoinExec.java | 343 ++++++++++++
.../planner/rewrite/FilterPushDownRule.java | 111 +++-
.../org/apache/tajo/engine/utils/TupleUtil.java | 16 +
.../physical/TestFullOuterHashJoinExec.java | 394 ++++++++++++++
.../physical/TestFullOuterMergeJoinExec.java | 516 +++++++++++++++++++
.../physical/TestLeftOuterHashJoinExec.java | 450 ++++++++++++++++
.../physical/TestLeftOuterNLJoinExec.java | 459 +++++++++++++++++
.../planner/physical/TestMergeJoinExec.java | 2 +-
.../physical/TestRightOuterHashJoinExec.java | 342 ++++++++++++
.../physical/TestRightOuterMergeJoinExec.java | 506 ++++++++++++++++++
24 files changed, 4518 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a148c20..e802376 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@ Release 0.2.0 - unreleased
NEW FEATURES
+ TAJO-216: Improve FilterPushDownRule and Implement physical operators
+ for outer join. (camelia_c via hyunsik)
+
TAJO-211: Implement regexp_replace function. (hyunsik)
TAJO-212: Implement type cast expresion. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
index 9ce9089..651b00b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
@@ -18,6 +18,7 @@
package org.apache.tajo.datum;
+import com.google.common.primitives.UnsignedBytes;
import com.google.gson.annotations.Expose;
import org.apache.tajo.datum.exception.InvalidOperationException;
@@ -135,10 +136,14 @@ public class CharDatum extends Datum {
@Override
public BooleanDatum equalsTo(Datum datum) {
switch (datum.type()) {
- case CHAR:
- return DatumFactory.createBool(this.equals(datum));
- default:
- throw new InvalidOperationException(datum.type());
+ case CHAR:
+ return DatumFactory.createBool(this.equals(datum));
+
+ case NULL:
+ return DatumFactory.createBool(false);
+
+ default:
+ throw new InvalidOperationException();
}
}
@@ -147,9 +152,13 @@ public class CharDatum extends Datum {
switch (datum.type()) {
case CHAR:
CharDatum other = (CharDatum) datum;
- return this.getString().compareTo(other.getString());
- default:
- throw new InvalidOperationException(datum.type());
+ return UnsignedBytes.lexicographicalComparator().compare(bytes, other.bytes);
+
+ case NULL:
+ return -1;
+
+ default:
+ throw new InvalidOperationException();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
index 97e608a..8de20ee 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
@@ -33,7 +33,7 @@ public class Float4Datum extends Datum implements NumericDatum {
public Float4Datum() {
super(TajoDataTypes.Type.FLOAT4);
}
-
+
public Float4Datum(float val) {
this();
this.val = val;
@@ -44,7 +44,7 @@ public class Float4Datum extends Datum implements NumericDatum {
ByteBuffer bb = ByteBuffer.wrap(bytes);
this.val = bb.getFloat();
}
-
+
public boolean asBool() {
throw new InvalidCastException();
}
@@ -53,7 +53,7 @@ public class Float4Datum extends Datum implements NumericDatum {
public char asChar() {
return asChars().charAt(0);
}
-
+
@Override
public short asInt2() {
return (short) val;
@@ -105,7 +105,7 @@ public class Float4Datum extends Datum implements NumericDatum {
public int size() {
return size;
}
-
+
@Override
public int hashCode() {
return (int) val;
@@ -117,73 +117,87 @@ public class Float4Datum extends Datum implements NumericDatum {
Float4Datum other = (Float4Datum) obj;
return val == other.val;
}
-
+
return false;
}
@Override
public BooleanDatum equalsTo(Datum datum) {
switch (datum.type()) {
- case INT2:
- return DatumFactory.createBool(val == datum.asInt2());
- case INT4:
- return DatumFactory.createBool(val == datum.asInt4());
- case INT8:
- return DatumFactory.createBool(val == datum.asInt8());
- case FLOAT4:
- return DatumFactory.createBool(val == datum.asFloat4());
- case FLOAT8:
- return DatumFactory.createBool(val == datum.asFloat8());
- default:
- throw new InvalidOperationException(datum.type());
+ case INT2:
+ return DatumFactory.createBool(val == datum.asInt2());
+ case INT4:
+ return DatumFactory.createBool(val == datum.asInt4());
+ case INT8:
+ return DatumFactory.createBool(val == datum.asInt8());
+ case FLOAT4:
+ return DatumFactory.createBool(val == datum.asFloat4());
+ case FLOAT8:
+ return DatumFactory.createBool(val == datum.asFloat8());
+ case NULL:
+ return DatumFactory.createBool(false);
+ default:
+ throw new InvalidOperationException();
}
}
@Override
public int compareTo(Datum datum) {
switch (datum.type()) {
- case INT2:
- if (val < datum.asInt2()) {
+ case INT2: {
+ short another = datum.asInt2();
+ if (val < another) {
return -1;
- } else if (datum.asInt2() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT4:
- if (val < datum.asInt4()) {
+ }
+ case INT4: {
+ int another = datum.asInt4();
+ if (val < another) {
return -1;
- } else if (datum.asInt4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT8:
- if (val < datum.asInt8()) {
+ }
+ case INT8: {
+ long another = datum.asInt8();
+ if (val < another) {
return -1;
- } else if (datum.asInt8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT4:
- if (val < datum.asFloat4()) {
+ }
+ case FLOAT4: {
+ float another = datum.asFloat4();
+ if (val < another) {
return -1;
- } else if (datum.asFloat4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT8:
- if (val < datum.asFloat8()) {
+ }
+ case FLOAT8: {
+ double another = datum.asFloat8();
+ if (val < another) {
return -1;
- } else if (datum.asFloat8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
+ }
+ case NULL:
+ return -1;
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
@@ -200,6 +214,8 @@ public class Float4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val + datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val + datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -218,6 +234,8 @@ public class Float4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val - datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val - datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -236,6 +254,8 @@ public class Float4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val * datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val * datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException();
}
@@ -254,6 +274,8 @@ public class Float4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val / datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val / datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -272,6 +294,8 @@ public class Float4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val / datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val / datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -279,6 +303,6 @@ public class Float4Datum extends Datum implements NumericDatum {
@Override
public void inverseSign() {
- this.val = - val;
+ this.val = - val;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
index d531942..1857929 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
@@ -28,10 +28,16 @@ import org.apache.tajo.util.NumberUtil;
import java.nio.ByteBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
public class Float8Datum extends Datum implements NumericDatum {
private static final int size = 8;
@Expose private double val;
+ private static final Log LOG = LogFactory.getLog(Float8Datum.class);
+
public Float8Datum() {
super(TajoDataTypes.Type.FLOAT8);
}
@@ -126,56 +132,70 @@ public class Float8Datum extends Datum implements NumericDatum {
return DatumFactory.createBool(val == datum.asFloat4());
case FLOAT8:
return DatumFactory.createBool(val == datum.asFloat8());
+ case NULL:
+ return DatumFactory.createBool(false);
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
@Override
public int compareTo(Datum datum) {
switch (datum.type()) {
- case INT2:
- if (val < datum.asInt2()) {
+ case INT2: {
+ short another = datum.asInt2();
+ if (val < another) {
return -1;
- } else if (datum.asInt2() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT4:
- if (val < datum.asInt4()) {
+ }
+ case INT4: {
+ int another = datum.asInt4();
+ if (val < another) {
return -1;
- } else if (datum.asInt4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT8:
- if (val < datum.asInt8()) {
+ }
+ case INT8:{
+ long another = datum.asInt8();
+ if (val < another) {
return -1;
- } else if (datum.asInt8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT4:
- if (val < datum.asFloat4()) {
+ }
+ case FLOAT4: {
+ float another = datum.asFloat4();
+ if (val < another) {
return -1;
- } else if (datum.asFloat4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT8:
- if (val < datum.asFloat8()) {
+ }
+ case FLOAT8: {
+ double another = datum.asFloat8();
+ if (val < another) {
return -1;
- } else if (datum.asFloat8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
+ }
+ case NULL:
+ return -1;
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
@@ -192,6 +212,8 @@ public class Float8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val + datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val + datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -210,6 +232,8 @@ public class Float8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val - datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val - datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -228,6 +252,8 @@ public class Float8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val * datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val * datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException();
}
@@ -246,6 +272,8 @@ public class Float8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val / datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val / datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -264,6 +292,8 @@ public class Float8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val % datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val % datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
index b34031f..000a107 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
@@ -25,10 +25,17 @@ import org.apache.tajo.util.NumberUtil;
import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
public class Int2Datum extends Datum implements NumericDatum {
private static final int size = 2;
@Expose private short val;
+ private static final Log LOG = LogFactory.getLog(Int2Datum.class);
+
public Int2Datum() {
super(TajoDataTypes.Type.INT2);
}
@@ -124,56 +131,70 @@ public class Int2Datum extends Datum implements NumericDatum {
return DatumFactory.createBool(val == datum.asFloat4());
case FLOAT8:
return DatumFactory.createBool(val == datum.asFloat8());
+ case NULL:
+ return DatumFactory.createBool(false);
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
@Override
public int compareTo(Datum datum) {
switch (datum.type()) {
- case INT2:
- if (val < datum.asInt2()) {
+ case INT2: {
+ short another = datum.asInt2();
+ if (val < another) {
return -1;
- } else if (datum.asInt2() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT4:
- if (val < datum.asInt4()) {
+ }
+ case INT4: {
+ int another = datum.asInt4();
+ if (val < another) {
return -1;
- } else if (datum.asInt4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT8:
- if (val < datum.asInt8()) {
+ }
+ case INT8: {
+ long another = datum.asInt8();
+ if (val < another) {
return -1;
- } else if (datum.asInt8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT4:
- if (val < datum.asFloat4()) {
+ }
+ case FLOAT4: {
+ float another = datum.asFloat4();
+ if (val < another) {
return -1;
- } else if (datum.asFloat4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT8:
- if (val < datum.asFloat8()) {
+ }
+ case FLOAT8: {
+ double another = datum.asFloat8();
+ if (val < another) {
return -1;
- } else if (datum.asFloat8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
+ }
+ case NULL:
+ return -1;
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
@@ -190,6 +211,8 @@ public class Int2Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val + datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val + datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -208,6 +231,8 @@ public class Int2Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val - datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val - datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -226,6 +251,8 @@ public class Int2Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val * datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val * datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -244,6 +271,8 @@ public class Int2Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val / datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val / datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -262,6 +291,8 @@ public class Int2Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val % datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val % datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
index 1422187..a68a5f1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
@@ -20,12 +20,12 @@ package org.apache.tajo.datum;
import com.google.gson.annotations.Expose;
import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.exception.InvalidCastException;
import org.apache.tajo.datum.exception.InvalidOperationException;
import org.apache.tajo.util.NumberUtil;
import java.nio.ByteBuffer;
+
public class Int4Datum extends Datum implements NumericDatum {
private static final int size = 4;
@Expose private int val;
@@ -129,6 +129,8 @@ public class Int4Datum extends Datum implements NumericDatum {
return DatumFactory.createBool(val == datum.asFloat4());
case FLOAT8:
return DatumFactory.createBool(val == datum.asFloat8());
+ case NULL:
+ return DatumFactory.createBool(false);
default:
throw new InvalidOperationException();
}
@@ -137,48 +139,60 @@ public class Int4Datum extends Datum implements NumericDatum {
@Override
public int compareTo(Datum datum) {
switch (datum.type()) {
- case INT2:
- if (val < datum.asInt2()) {
+ case INT2: {
+ short another = datum.asInt2();
+ if (val < another) {
return -1;
- } else if (datum.asInt2() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT4:
- if (val < datum.asInt4()) {
+ }
+ case INT4: {
+ int another = datum.asInt4();
+ if (val < another) {
return -1;
- } else if (datum.asInt4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT8:
- if (val < datum.asInt8()) {
+ }
+ case INT8: {
+ long another = datum.asInt8();
+ if (val < another) {
return -1;
- } else if (datum.asInt8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT4:
- if (val < datum.asFloat4()) {
+ }
+ case FLOAT4:{
+ float another = datum.asFloat4();
+ if (val < another) {
return -1;
- } else if (datum.asFloat4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT8:
- if (val < datum.asFloat8()) {
+ }
+ case FLOAT8: {
+ double another = datum.asFloat8();
+ if (val < another) {
return -1;
- } else if (datum.asFloat8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
+ }
+ case NULL:
+ return -1;
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
@@ -195,6 +209,8 @@ public class Int4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val + datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val + datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -213,6 +229,8 @@ public class Int4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val - datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val - datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -231,6 +249,8 @@ public class Int4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val * datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val * datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException();
}
@@ -249,6 +269,8 @@ public class Int4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val / datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val / datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -267,6 +289,8 @@ public class Int4Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat4(val % datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val % datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
index 689adbc..7e8bd27 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
@@ -26,6 +26,11 @@ import org.apache.tajo.util.NumberUtil;
import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
public class Int8Datum extends Datum implements NumericDatum {
private static final int size = 8;
@Expose private long val;
@@ -135,56 +140,70 @@ public class Int8Datum extends Datum implements NumericDatum {
return DatumFactory.createBool(val == datum.asFloat4());
case FLOAT8:
return DatumFactory.createBool(val == datum.asFloat8());
+ case NULL:
+ return DatumFactory.createBool(false);
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
@Override
public int compareTo(Datum datum) {
switch (datum.type()) {
- case INT2:
- if (val < datum.asInt2()) {
+ case INT2: {
+ short another = datum.asInt2();
+ if (val < another) {
return -1;
- } else if (datum.asInt2() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT4:
- if (val < datum.asInt4()) {
+ }
+ case INT4: {
+ int another = datum.asInt4();
+ if (val < another) {
return -1;
- } else if (datum.asInt4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case INT8:
- if (val < datum.asInt8()) {
+ }
+ case INT8: {
+ long another = datum.asInt8();
+ if (val < another) {
return -1;
- } else if (datum.asInt8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT4:
- if (val < datum.asFloat4()) {
+ }
+ case FLOAT4: {
+ float another = datum.asFloat4();
+ if (val < another) {
return -1;
- } else if (datum.asFloat4() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
- case FLOAT8:
- if (val < datum.asFloat8()) {
+ }
+ case FLOAT8:{
+ double another = datum.asFloat8();
+ if (val < another) {
return -1;
- } else if (datum.asFloat8() < val) {
+ } else if (val > another) {
return 1;
} else {
return 0;
}
+ }
+ case NULL:
+ return -1;
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
@@ -201,6 +220,8 @@ public class Int8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val + datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val + datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -219,6 +240,8 @@ public class Int8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val - datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val - datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -237,6 +260,8 @@ public class Int8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val * datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val * datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -255,6 +280,8 @@ public class Int8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val / datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val / datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
@@ -273,6 +300,8 @@ public class Int8Datum extends Datum implements NumericDatum {
return DatumFactory.createFloat8(val % datum.asFloat4());
case FLOAT8:
return DatumFactory.createFloat8(val % datum.asFloat8());
+ case NULL:
+ return datum;
default:
throw new InvalidOperationException(datum.type());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
index 10c35a9..2cce626 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
@@ -104,4 +104,4 @@ public class NullDatum extends Datum {
public int hashCode() {
return 0; // one of the prime number
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
index bab99a2..a155f1a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
@@ -18,6 +18,7 @@
package org.apache.tajo.datum;
+import com.google.common.primitives.UnsignedBytes;
import com.google.gson.annotations.Expose;
import org.apache.hadoop.io.WritableComparator;
import org.apache.tajo.common.TajoDataTypes;
@@ -26,6 +27,9 @@ import org.apache.tajo.datum.exception.InvalidOperationException;
import java.util.Arrays;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
public class TextDatum extends Datum {
@Expose private int size;
@Expose private byte [] bytes;
@@ -102,11 +106,14 @@ public class TextDatum extends Datum {
public int compareTo(Datum datum) {
switch (datum.type()) {
case TEXT:
- byte[] o = datum.asByteArray();
- return WritableComparator.compareBytes(this.bytes, 0, this.bytes.length,
- o, 0, o.length);
+ case CHAR:
+ case BLOB:
+ return UnsignedBytes.lexicographicalComparator().compare(bytes, datum.asByteArray());
+
+ case NULL:
+ return -1;
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
@@ -114,7 +121,7 @@ public class TextDatum extends Datum {
public boolean equals(Object obj) {
if (obj instanceof TextDatum) {
TextDatum o = (TextDatum) obj;
- return Arrays.equals(this.bytes, o.bytes);
+ return UnsignedBytes.lexicographicalComparator().compare(this.bytes, o.bytes) == 0;
}
return false;
@@ -124,10 +131,14 @@ public class TextDatum extends Datum {
public BooleanDatum equalsTo(Datum datum) {
switch (datum.type()) {
case TEXT:
- return DatumFactory.createBool(
- Arrays.equals(this.bytes, datum.asByteArray()));
+ case CHAR:
+ case BLOB:
+ return DatumFactory.createBool(UnsignedBytes.lexicographicalComparator()
+ .compare(bytes, datum.asByteArray()) == 0);
+ case NULL:
+ return DatumFactory.createBool(false);
default:
- throw new InvalidOperationException(datum.type());
+ throw new InvalidOperationException();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 8110a3b..c6e4695 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -57,6 +57,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
protected final TajoConf conf;
protected final AbstractStorageManager sm;
+ final long threshold = 1048576 * 128; // 64MB
+
public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
this.conf = conf;
@@ -194,15 +196,19 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
switch (joinNode.getJoinType()) {
case CROSS:
- LOG.info("The planner chooses [Nested Loop Join]");
return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
case INNER:
return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
- case FULL_OUTER:
case LEFT_OUTER:
+ return createLeftOuterJoinPlan(context, joinNode, leftExec, rightExec);
+
case RIGHT_OUTER:
+ return createRightOuterJoinPlan(context, joinNode, leftExec, rightExec);
+
+ case FULL_OUTER:
+ return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
case LEFT_SEMI:
case RIGHT_SEMI:
@@ -215,6 +221,171 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
+ private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ if (property != null) {
+ JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+ switch (algorithm) {
+ case IN_MEMORY_HASH_JOIN:
+ LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+ return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+ case NESTED_LOOP_JOIN:
+ //the right operand is too large, so we opt for NL implementation of left outer join
+ LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
+ return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+ default:
+ LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
+ LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+ return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+ }
+ } else {
+ return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ long rightTableVolume = estimateSizeRecursive(context, rightLineage);
+
+ if (rightTableVolume < threshold) {
+ // we can implement left outer join using hash join, using the right operand as the build relation
+ LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+ return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+ }
+ else {
+ //the right operand is too large, so we opt for NL implementation of left outer join
+ LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
+ return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
+ // blocking, but merge join is blocking as well)
+ String [] outerLineage4 = PlannerUtil.getLineage(plan.getLeftChild());
+ long outerSize4 = estimateSizeRecursive(context, outerLineage4);
+ if (outerSize4 < threshold){
+ LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+ return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+ } else {
+ return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private PhysicalExec createRightOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ //the left operand is too large, so opt for merge join implementation
+ LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Merge Join].");
+ SortSpec[][] sortSpecs2 = PlannerUtil.getSortKeysFromJoinQual(
+ plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+ ExternalSortExec outerSort2 = new ExternalSortExec(context, sm,
+ new SortNode(UNGENERATED_PID,sortSpecs2[0], leftExec.getSchema(), leftExec.getSchema()), leftExec);
+ ExternalSortExec innerSort2 = new ExternalSortExec(context, sm,
+ new SortNode(UNGENERATED_PID,sortSpecs2[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
+ return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
+ }
+
+ private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ if (property != null) {
+ JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+ switch (algorithm) {
+ case IN_MEMORY_HASH_JOIN:
+ LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+ return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+ case MERGE_JOIN:
+ return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ default:
+ LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
+ LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+ return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ }
+ } else {
+ return createBestRightJoinPlan(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ Enforcer enforcer = context.getEnforcer();
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ if (property != null) {
+ JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+ switch (algorithm) {
+ case IN_MEMORY_HASH_JOIN:
+ return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+
+ case MERGE_JOIN:
+ return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+
+ default:
+ LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
+ LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+ return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ }
+ } else {
+ return createBestFullOuterJoinPlan(context, plan, leftExec, rightExec);
+ }
+ }
+
+ private FullOuterHashJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec)
+ throws IOException {
+ String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+ String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ long outerSize2 = estimateSizeRecursive(context, leftLineage);
+ long innerSize2 = estimateSizeRecursive(context, rightLineage);
+
+ PhysicalExec selectedRight;
+ PhysicalExec selectedLeft;
+
+ // HashJoinExec loads the smaller relation to memory.
+ if (outerSize2 <= innerSize2) {
+ selectedLeft = leftExec;
+ selectedRight = rightExec;
+ } else {
+ selectedLeft = rightExec;
+ selectedRight = leftExec;
+ }
+ LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
+ return new FullOuterHashJoinExec(context, plan, selectedRight, selectedLeft);
+ }
+
+ private FullOuterMergeJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec)
+ throws IOException {
+ // if size too large, full outer merge join implementation
+ LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Merge Join]");
+ SortSpec[][] sortSpecs3 = PlannerUtil.getSortKeysFromJoinQual(plan.getJoinQual(),
+ leftExec.getSchema(), rightExec.getSchema());
+ ExternalSortExec outerSort3 = new ExternalSortExec(context, sm,
+ new SortNode(UNGENERATED_PID,sortSpecs3[0], leftExec.getSchema(), leftExec.getSchema()), leftExec);
+ ExternalSortExec innerSort3 = new ExternalSortExec(context, sm,
+ new SortNode(UNGENERATED_PID,sortSpecs3[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
+
+ return new FullOuterMergeJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
+ }
+
+ private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+ String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+ String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ long outerSize2 = estimateSizeRecursive(context, leftLineage);
+ long innerSize2 = estimateSizeRecursive(context, rightLineage);
+ final long threshold = 1048576 * 128;
+ if (outerSize2 < threshold || innerSize2 < threshold) {
+ return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+ } else {
+ return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ }
+ }
+
private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
@@ -358,8 +529,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new StoreTableExec(ctx, plan, subOp);
}
- public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
- throws IOException {
+ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode) throws IOException {
Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
"Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
new file mode 100644
index 0000000..aebfdb5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class FullOuterHashJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ protected JoinNode plan;
+ protected EvalNode joinQual;
+
+ protected List<Column[]> joinKeyPairs;
+
+ // temporal tuples and states for nested loop join
+ protected boolean first = true;
+ protected FrameTuple frameTuple;
+ protected Tuple outTuple = null;
+ protected Map<Tuple, List<Tuple>> tupleSlots;
+ protected Iterator<Tuple> iterator = null;
+ protected EvalContext qualCtx;
+ protected Tuple leftTuple;
+ protected Tuple leftKeyTuple;
+
+ protected int [] leftKeyList;
+ protected int [] rightKeyList;
+
+ protected boolean finished = false;
+ protected boolean shouldGetLeftTuple = true;
+
+ // projection
+ protected final Projector projector;
+ protected final EvalContext [] evalContexts;
+
+ private int rightNumCols;
+ private int leftNumCols;
+ private Map<Tuple, Boolean> matched;
+
+ public FullOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+ PhysicalExec inner) {
+ super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
+ plan.getOutSchema(), outer, inner);
+ this.plan = plan;
+ this.joinQual = plan.getJoinQual();
+ this.qualCtx = joinQual.newContext();
+ this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+ // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
+ // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
+ this.matched = new HashMap<Tuple, Boolean>(10000);
+
+ this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
+ outer.getSchema(), inner.getSchema());
+
+ leftKeyList = new int[joinKeyPairs.size()];
+ rightKeyList = new int[joinKeyPairs.size()];
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+ }
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+ }
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.evalContexts = projector.renew();
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.getColumnNum());
+ leftKeyTuple = new VTuple(leftKeyList.length);
+
+ leftNumCols = outer.getSchema().getColumnNum();
+ rightNumCols = inner.getSchema().getColumnNum();
+ }
+
+ protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+ for (int i = 0; i < leftKeyList.length; i++) {
+ keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+ }
+ }
+
+ public Tuple getNextUnmatchedRight() {
+
+ List<Tuple> newValue;
+ Tuple returnedTuple;
+ // get a keyTUple from the matched hashmap with a boolean false value
+ for(Tuple aKeyTuple : matched.keySet()) {
+ if(matched.get(aKeyTuple) == false) {
+ newValue = tupleSlots.get(aKeyTuple);
+ returnedTuple = newValue.remove(0);
+ tupleSlots.put(aKeyTuple, newValue);
+
+ // after taking the last element from the list in tupleSlots, set flag true in matched as well
+ if(newValue.isEmpty()){
+ matched.put(aKeyTuple, true);
+ }
+
+ return returnedTuple;
+ }
+ }
+ return null;
+ }
+
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ }
+
+ Tuple rightTuple;
+ boolean found = false;
+
+ while(!finished) {
+ if (shouldGetLeftTuple) { // initially, it is true.
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+ Tuple unmatchedRightTuple = getNextUnmatchedRight();
+ if( unmatchedRightTuple == null) {
+ finished = true;
+ outTuple = null;
+ return null;
+ } else {
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+
+ return outTuple;
+ }
+ }
+
+ // getting corresponding right
+ getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+ if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+ iterator = tupleSlots.get(leftKeyTuple).iterator();
+ shouldGetLeftTuple = false;
+ } else {
+ //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+ //output a tuple with the nulls padded rightTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ shouldGetLeftTuple = true;
+ return outTuple;
+ }
+ }
+
+ // getting a next right tuple on in-memory hash table.
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+ joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
+ if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ found = true;
+ getKeyLeftTuple(leftTuple, leftKeyTuple);
+ matched.put(leftKeyTuple, true);
+ }
+
+ if (!iterator.hasNext()) { // no more right tuples for this hash key
+ shouldGetLeftTuple = true;
+ }
+
+ if (found) {
+ break;
+ }
+ }
+ return outTuple;
+ }
+
+ protected void loadRightToHashTable() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+
+ while ((tuple = rightChild.next()) != null) {
+ keyTuple = new VTuple(joinKeyPairs.size());
+ List<Tuple> newValue;
+ for (int i = 0; i < rightKeyList.length; i++) {
+ keyTuple.put(i, tuple.get(rightKeyList[i]));
+ }
+
+ if (tupleSlots.containsKey(keyTuple)) {
+ newValue = tupleSlots.get(keyTuple);
+ newValue.add(tuple);
+ tupleSlots.put(keyTuple, newValue);
+ } else {
+ newValue = new ArrayList<Tuple>();
+ newValue.add(tuple);
+ tupleSlots.put(keyTuple, newValue);
+ matched.put(keyTuple,false);
+ }
+ }
+ first = false;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+
+ tupleSlots.clear();
+ first = true;
+
+ finished = false;
+ iterator = null;
+ shouldGetLeftTuple = true;
+ }
+
+ public void close() throws IOException {
+ tupleSlots.clear();
+ }
+
+ public JoinNode getPlan() {
+ return this.plan;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
new file mode 100644
index 0000000..2c11fea
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class FullOuterMergeJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ private JoinNode joinNode;
+ private EvalNode joinQual;
+ private EvalContext qualCtx;
+
+ // temporal tuples and states for nested loop join
+ private FrameTuple frameTuple;
+ private Tuple leftTuple = null;
+ private Tuple rightTuple = null;
+ private Tuple outTuple = null;
+ private Tuple leftNext = null;
+
+ private final List<Tuple> leftTupleSlots;
+ private final List<Tuple> rightTupleSlots;
+
+ private JoinTupleComparator joincomparator = null;
+ private TupleComparator[] tupleComparator = null;
+
+ private final static int INITIAL_TUPLE_SLOT = 10000;
+
+ private boolean end = false;
+
+ // projection
+ private final Projector projector;
+ private final EvalContext [] evalContexts;
+
+ private int rightNumCols;
+ private int leftNumCols;
+ private int posRightTupleSlots = -1;
+ private int posLeftTupleSlots = -1;
+ boolean endInPopulationStage = false;
+ private boolean initRightDone = false;
+
+ public FullOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+ PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+ Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+ "but there is no join condition");
+ this.joinNode = plan;
+ this.joinQual = plan.getJoinQual();
+ this.qualCtx = this.joinQual.newContext();
+
+ this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ SortSpec[][] sortSpecs = new SortSpec[2][];
+ sortSpecs[0] = leftSortKey;
+ sortSpecs[1] = rightSortKey;
+
+ this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
+ rightChild.getSchema(), sortSpecs);
+ this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+ plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.evalContexts = projector.renew();
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.getColumnNum());
+
+ leftNumCols = leftChild.getSchema().getColumnNum();
+ rightNumCols = rightChild.getSchema().getColumnNum();
+ }
+
+ public JoinNode getPlan(){
+ return this.joinNode;
+ }
+
+ public Tuple next() throws IOException {
+ Tuple previous;
+
+ for (;;) {
+ boolean newRound = false;
+ if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+ newRound = true;
+ }
+ if ((posRightTupleSlots == rightTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+ newRound = true;
+ }
+
+ if(newRound == true){
+
+ if (end) {
+
+ ////////////////////////////////////////////////////////////////////////
+ // FINALIZING STAGE
+ ////////////////////////////////////////////////////////////////////////
+ // the finalizing stage, where remaining tuples on the right are
+ // transformed into left-padded results while tuples on the left
+ // are transformed into right-padded results
+
+ // before exit, a left-padded tuple should be built for all remaining
+ // right side and a right-padded tuple should be built for all remaining
+ // left side
+
+ if (initRightDone == false) {
+ // maybe the left operand was empty => the right one didn't have the chance to initialize
+ rightTuple = rightChild.next();
+ initRightDone = true;
+ }
+
+ if((leftTuple == null) && (rightTuple == null)) {
+ return null;
+ }
+
+ if((leftTuple == null) && (rightTuple != null)){
+ // output a tuple with the nulls padded leftTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, rightTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ rightTuple = rightChild.next();
+ return outTuple;
+ }
+
+ if((leftTuple != null) && (rightTuple == null)){
+ // output a tuple with the nulls padded leftTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ leftTuple = leftChild.next();
+ return outTuple;
+ }
+ } // if end
+
+ ////////////////////////////////////////////////////////////////////////
+ // INITIALIZING STAGE
+ ////////////////////////////////////////////////////////////////////////
+ // initializing stage, reading the first tuple on each side
+ if (leftTuple == null) {
+ leftTuple = leftChild.next();
+ if( leftTuple == null){
+ end = true;
+ continue;
+ }
+ }
+ if (rightTuple == null) {
+ rightTuple = rightChild.next();
+ initRightDone = true;
+ if (rightTuple == null) {
+ end = true;
+ continue;
+ }
+ }
+
+ // reset tuple slots for a new round
+ leftTupleSlots.clear();
+ rightTupleSlots.clear();
+ posRightTupleSlots = -1;
+ posLeftTupleSlots = -1;
+
+ ////////////////////////////////////////////////////////////////////////
+ // Comparison and Move Forward Stage
+ ////////////////////////////////////////////////////////////////////////
+ // advance alternatively on each side until a match is found
+ int cmp;
+ while (!end && ((cmp = joincomparator.compare(leftTuple, rightTuple)) != 0)) {
+
+ if (cmp > 0) {
+
+ //before getting a new tuple from the right, a leftnullpadded tuple should be built
+ //output a tuple with the nulls padded leftTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, rightTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ // BEFORE RETURN, MOVE FORWARD
+ rightTuple = rightChild.next();
+ if(rightTuple == null) {
+ end = true;
+ }
+
+ return outTuple;
+
+ } else if (cmp < 0) {
+ // before getting a new tuple from the left, a rightnullpadded tuple should be built
+ // output a tuple with the nulls padded rightTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ // BEFORE RETURN, MOVE FORWARD
+ leftTuple = leftChild.next();
+ if(leftTuple == null) {
+ end = true;
+ }
+
+ return outTuple;
+
+ } // if (cmp < 0)
+ } //while
+
+
+ ////////////////////////////////////////////////////////////////////////
+ // SLOTS POPULATION STAGE
+ ////////////////////////////////////////////////////////////////////////
+ // once a match is found, retain all tuples with this key in tuple slots
+ // on each side
+ if(!end) {
+ endInPopulationStage = false;
+
+ boolean endLeft = false;
+ boolean endRight = false;
+
+ previous = new VTuple(leftTuple);
+ do {
+ leftTupleSlots.add(new VTuple(leftTuple));
+ leftTuple = leftChild.next();
+ if(leftTuple == null) {
+ endLeft = true;
+ }
+
+
+ } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+ posLeftTupleSlots = 0;
+
+
+ previous = new VTuple(rightTuple);
+ do {
+ rightTupleSlots.add(new VTuple(rightTuple));
+ rightTuple = rightChild.next();
+ if(rightTuple == null) {
+ endRight = true;
+ }
+
+ } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+ posRightTupleSlots = 0;
+
+ if ((endLeft == true) || (endRight == true)) {
+ end = true;
+ endInPopulationStage = true;
+ }
+
+ } // if end false
+ } // if newRound
+
+
+ ////////////////////////////////////////////////////////////////////////
+ // RESULTS STAGE
+ ////////////////////////////////////////////////////////////////////////
+ // now output result matching tuples from the slots
+ // if either we haven't reached end on neither side, or we did reach end
+ // on one(or both) sides but that happened in the slots population step
+ // (i.e. refers to next round)
+ if(!end || (end && endInPopulationStage)){
+ if(posLeftTupleSlots == 0){
+ leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+ posLeftTupleSlots = posLeftTupleSlots + 1;
+ }
+
+ if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
+ Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+ posRightTupleSlots = posRightTupleSlots + 1;
+ frameTuple.set(leftNext, aTuple);
+ joinQual.eval(qualCtx, inSchema, frameTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ return outTuple;
+ } else {
+ // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+ if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
+ //rewind the right slots position
+ posRightTupleSlots = 0;
+ Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+ posRightTupleSlots = posRightTupleSlots + 1;
+ leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+ posLeftTupleSlots = posLeftTupleSlots + 1;
+
+ frameTuple.set(leftNext, aTuple);
+ joinQual.eval(qualCtx, inSchema, frameTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ return outTuple;
+ }
+ }
+ } // the second if end false
+ } // for
+ }
+
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ leftTupleSlots.clear();
+ rightTupleSlots.clear();
+ posRightTupleSlots = -1;
+ posLeftTupleSlots = -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
new file mode 100644
index 0000000..bbf39dd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.tajo.datum.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class LeftOuterHashJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ protected JoinNode plan;
+ protected EvalNode joinQual;
+
+ protected List<Column[]> joinKeyPairs;
+
+ // temporal tuples and states for nested loop join
+ protected boolean first = true;
+ protected FrameTuple frameTuple;
+ protected Tuple outTuple = null;
+ protected Map<Tuple, List<Tuple>> tupleSlots;
+ protected Iterator<Tuple> iterator = null;
+ protected EvalContext qualCtx;
+ protected Tuple leftTuple;
+ protected Tuple leftKeyTuple;
+
+ protected int [] leftKeyList;
+ protected int [] rightKeyList;
+
+ protected boolean finished = false;
+ protected boolean shouldGetLeftTuple = true;
+
+ // projection
+ protected final Projector projector;
+ protected final EvalContext [] evalContexts;
+
+ private int rightNumCols;
+ private int leftNumCols;
+ private static final Log LOG = LogFactory.getLog(LeftOuterHashJoinExec.class);
+
+ public LeftOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+ PhysicalExec rightChild) {
+ super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
+ plan.getOutSchema(), leftChild, rightChild);
+ this.plan = plan;
+ this.joinQual = plan.getJoinQual();
+ this.qualCtx = joinQual.newContext();
+ this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+ this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema());
+
+ leftKeyList = new int[joinKeyPairs.size()];
+ rightKeyList = new int[joinKeyPairs.size()];
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+ }
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+ }
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+ this.evalContexts = projector.renew();
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.getColumnNum());
+ leftKeyTuple = new VTuple(leftKeyList.length);
+
+ leftNumCols = leftChild.getSchema().getColumnNum();
+ rightNumCols = rightChild.getSchema().getColumnNum();
+ }
+
+ protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+ for (int i = 0; i < leftKeyList.length; i++) {
+ keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+ }
+ }
+
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ }
+
+ Tuple rightTuple;
+ boolean found = false;
+
+ while(!finished) {
+
+ if (shouldGetLeftTuple) { // initially, it is true.
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = true;
+ return null;
+ }
+
+ // getting corresponding right
+ getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+ if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+ iterator = tupleSlots.get(leftKeyTuple).iterator();
+ shouldGetLeftTuple = false;
+ } else {
+ // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ shouldGetLeftTuple = true;
+ return outTuple;
+ }
+ }
+
+ // getting a next right tuple on in-memory hash table.
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+ joinQual.eval(qualCtx, inSchema, frameTuple);
+ if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ found = true;
+ }
+
+ if (!iterator.hasNext()) { // no more right tuples for this hash key
+ shouldGetLeftTuple = true;
+ }
+
+ if (found) {
+ break;
+ }
+ }
+
+ return outTuple;
+ }
+
+ protected void loadRightToHashTable() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+
+ while ((tuple = rightChild.next()) != null) {
+ keyTuple = new VTuple(joinKeyPairs.size());
+ List<Tuple> newValue;
+ for (int i = 0; i < rightKeyList.length; i++) {
+ keyTuple.put(i, tuple.get(rightKeyList[i]));
+ }
+
+ if (tupleSlots.containsKey(keyTuple)) {
+ newValue = tupleSlots.get(keyTuple);
+ newValue.add(tuple);
+ tupleSlots.put(keyTuple, newValue);
+ } else {
+ newValue = new ArrayList<Tuple>();
+ newValue.add(tuple);
+ tupleSlots.put(keyTuple, newValue);
+ }
+ }
+ first = false;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+
+ tupleSlots.clear();
+ first = true;
+
+ finished = false;
+ iterator = null;
+ shouldGetLeftTuple = true;
+ }
+
+ public void close() throws IOException {
+ tupleSlots.clear();
+ }
+
+ public JoinNode getPlan() {
+ return this.plan;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
new file mode 100644
index 0000000..cc7b331
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+public class LeftOuterNLJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ private JoinNode plan;
+ private EvalNode joinQual;
+
+ // temporal tuples and states for nested loop join
+ private boolean needNextRightTuple;
+ private FrameTuple frameTuple;
+ private Tuple leftTuple = null;
+ private Tuple rightTuple = null;
+ private Tuple outTuple = null;
+ private EvalContext qualCtx;
+
+ // projection
+ private final EvalContext [] evalContexts;
+ private final Projector projector;
+
+ private boolean foundAtLeastOneMatch;
+ private int rightNumCols;
+
+ public LeftOuterNLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+ PhysicalExec rightChild) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+ this.plan = plan;
+
+ if (plan.hasJoinQual()) {
+ this.joinQual = plan.getJoinQual();
+ this.qualCtx = this.joinQual.newContext();
+ }
+
+ // for projection
+ projector = new Projector(inSchema, outSchema, plan.getTargets());
+ evalContexts = projector.renew();
+
+ // for join
+ needNextRightTuple = true;
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.getColumnNum());
+
+ foundAtLeastOneMatch = false;
+ rightNumCols = rightChild.getSchema().getColumnNum();
+ }
+
+ public JoinNode getPlan() {
+ return this.plan;
+ }
+
+ public Tuple next() throws IOException {
+ for (;;) {
+ if (needNextRightTuple) {
+ leftTuple = leftChild.next();
+ if (leftTuple == null) {
+ return null;
+ }
+ needNextRightTuple = false;
+ // a new tuple from the left child has initially no matches on the right operand
+ foundAtLeastOneMatch = false;
+ }
+ rightTuple = rightChild.next();
+
+ if (rightTuple == null) {
+ // the scan of the right operand is finished with no matches found
+ if(foundAtLeastOneMatch == false){
+ //output a tuple with the nulls padded rightTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ foundAtLeastOneMatch = true;
+ needNextRightTuple = true;
+ rightChild.rescan();
+ return outTuple;
+ } else {
+ needNextRightTuple = true;
+ rightChild.rescan();
+ continue;
+ }
+ }
+
+ frameTuple.set(leftTuple, rightTuple);
+ joinQual.eval(qualCtx, inSchema, frameTuple);
+ if (joinQual.terminate(qualCtx).asBool()) {
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ foundAtLeastOneMatch = true;
+ return outTuple;
+ }
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ needNextRightTuple = true;
+ }
+}