You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/08/14 16:30:17 UTC
[38/51] [partial] tajo git commit: TAJO-1761: Separate an integration
unit test kit into an independent module.
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
new file mode 100644
index 0000000..d7968fe
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestNLJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestNLJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private Path testDir;
+
+ private TableDesc employee;
+ private TableDesc people;
+
+ private MasterPlan masterPlan;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf = util.getConfiguration();
+
+ Schema schema = new Schema();
+ schema.addColumn("managerid", Type.INT4);
+ schema.addColumn("empid", Type.INT4);
+ schema.addColumn("memid", Type.INT4);
+ schema.addColumn("deptname", Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(employeeMeta, schema, employeePath);
+ appender.init();
+ VTuple tuple = new VTuple(schema.size());
+ for (int i = 0; i < 50; i++) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("dept_" + i)});
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+ employee = CatalogUtil.newTableDesc("default.employee", schema, employeeMeta, employeePath);
+ catalog.createTable(employee);
+
+ Schema peopleSchema = new Schema();
+ peopleSchema.addColumn("empid", Type.INT4);
+ peopleSchema.addColumn("fk_memid", Type.INT4);
+ peopleSchema.addColumn("name", Type.TEXT);
+ peopleSchema.addColumn("age", Type.INT4);
+ TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT");
+ Path peoplePath = new Path(testDir, "people.csv");
+ appender = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender.init();
+ tuple = new VTuple(peopleSchema.size());
+ for (int i = 1; i < 50; i += 2) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(30 + i)});
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+
+ people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+ catalog.createTable(people);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+
+ masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ "select managerId, e.empId, deptName, e.memId from employee as e, people p",
+ "select managerId, e.empId, deptName, e.memId from employee as e inner join people as p on " +
+ "e.empId = p.empId and e.memId = p.fk_memId"
+ };
+
+ @Test
+ public final void testNLCrossJoin() throws IOException, TajoException {
+ FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(),
+ new Path(people.getUri()), Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testNLCrossJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf),
+ expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ int i = 0;
+ exec.init();
+ while (exec.next() != null) {
+ i++;
+ }
+ exec.close();
+ assertEquals(50*50/2, i); // expected 10 * 5
+ }
+
+ @Test
+ public final void testNLInnerJoin() throws IOException, TajoException {
+ FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(),
+ new Path(people.getUri()), Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testNLInnerJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf),
+ expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ count++;
+ assertTrue(i == tuple.getInt4(0));
+ assertTrue(i == tuple.getInt4(1));
+ assertTrue(("dept_" + i).equals(tuple.getText(2)));
+ assertTrue(10 + i == tuple.getInt4(3));
+ i += 2;
+ }
+ exec.close();
+ assertEquals(50 / 2, count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
new file mode 100644
index 0000000..69b36c5
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -0,0 +1,1144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.*;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.function.FunctionLoader;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.*;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.SortEnforce.SortAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestPhysicalPlanner {
+ private static TajoTestingCluster util;
+ private static TajoConf conf;
+ private static CatalogService catalog;
+ private static SQLAnalyzer analyzer;
+ private static LogicalPlanner planner;
+ private static LogicalOptimizer optimizer;
+ private static FileTablespace sm;
+ private static Path testDir;
+ private static Session session = LocalTajoTestingUtility.createDummySession();
+ private static QueryContext defaultContext;
+
+ private static TableDesc employee = null;
+ private static TableDesc score = null;
+ private static TableDesc largeScore = null;
+
+ private static MasterPlan masterPlan;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ util = new TajoTestingCluster();
+
+ util.startCatalogCluster();
+ conf = util.getConfiguration();
+ testDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPhysicalPlanner");
+ sm = TablespaceManager.getLocalFs();
+ catalog = util.getMiniCatalogCluster().getCatalog();
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ for (FunctionDesc funcDesc : FunctionLoader.findLegacyFunctions()) {
+ catalog.createFunction(funcDesc);
+ }
+
+ Schema employeeSchema = new Schema();
+ employeeSchema.addColumn("name", Type.TEXT);
+ employeeSchema.addColumn("empid", Type.INT4);
+ employeeSchema.addColumn("deptname", Type.TEXT);
+
+ Schema scoreSchema = new Schema();
+ scoreSchema.addColumn("deptname", Type.TEXT);
+ scoreSchema.addColumn("class", Type.TEXT);
+ scoreSchema.addColumn("score", Type.INT4);
+ scoreSchema.addColumn("nullable", Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+
+
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = sm.getAppender(employeeMeta, employeeSchema, employeePath);
+ appender.init();
+ VTuple tuple = new VTuple(employeeSchema.size());
+ for (int i = 0; i < 100; i++) {
+ tuple.put(new Datum[] {DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(i), DatumFactory.createText("dept_" + i)});
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+
+ employee = new TableDesc(
+ CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "employee"), employeeSchema, employeeMeta,
+ employeePath.toUri());
+ catalog.createTable(employee);
+
+ Path scorePath = new Path(testDir, "score");
+ TableMeta scoreMeta = CatalogUtil.newTableMeta("TEXT", new KeyValueSet());
+ appender = sm.getAppender(scoreMeta, scoreSchema, scorePath);
+ appender.init();
+ score = new TableDesc(
+ CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score"), scoreSchema, scoreMeta,
+ scorePath.toUri());
+ tuple = new VTuple(scoreSchema.size());
+ int m = 0;
+ for (int i = 1; i <= 5; i++) {
+ for (int k = 3; k < 5; k++) {
+ for (int j = 1; j <= 3; j++) {
+ tuple.put(
+ new Datum[] {
+ DatumFactory.createText("name_" + i), // name_1 ~ 5 (cad: // 5)
+ DatumFactory.createText(k + "rd"), // 3 or 4rd (cad: 2)
+ DatumFactory.createInt4(j), // 1 ~ 3
+ m % 3 == 1 ? DatumFactory.createText("one") : NullDatum.get()});
+ appender.addTuple(tuple);
+ m++;
+ }
+ }
+ }
+ appender.flush();
+ appender.close();
+
+ defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+ catalog.createTable(score);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+ optimizer = new LogicalOptimizer(conf, catalog);
+ masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
+
+ createLargeScoreTable();
+ }
+
+ public static void createLargeScoreTable() throws IOException, TajoException {
+
+ // Preparing a large table
+ Path scoreLargePath = new Path(testDir, "score_large");
+ CommonTestingUtil.cleanupTestDir(scoreLargePath.toString());
+
+ Schema scoreSchmea = score.getSchema();
+ TableMeta scoreLargeMeta = CatalogUtil.newTableMeta("RAW", new KeyValueSet());
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(scoreLargeMeta, scoreSchmea, scoreLargePath);
+ appender.enableStats();
+ appender.init();
+ largeScore = new TableDesc(
+ CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score_large"), scoreSchmea, scoreLargeMeta,
+ scoreLargePath.toUri());
+
+ VTuple tuple = new VTuple(scoreSchmea.size());
+ int m = 0;
+ for (int i = 1; i <= 40000; i++) {
+ for (int k = 3; k < 5; k++) { // |{3,4}| = 2
+ for (int j = 1; j <= 3; j++) { // |{1,2,3}| = 3
+ tuple.put(
+ new Datum[] {
+ DatumFactory.createText("name_" + i), // name_1 ~ 5 (cad: // 5)
+ DatumFactory.createText(k + "rd"), // 3 or 4rd (cad: 2)
+ DatumFactory.createInt4(j), // 1 ~ 3
+ m % 3 == 1 ? DatumFactory.createText("one") : NullDatum.get()});
+ appender.addTuple(tuple);
+ m++;
+ }
+ }
+ }
+ appender.flush();
+ appender.close();
+ largeScore.setStats(appender.getStats());
+ catalog.createTable(largeScore);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ private String[] QUERIES = {
+ "select name, empId, deptName from employee", // 0
+ "select name, empId, e.deptName, manager from employee as e, dept as dp", // 1
+ "select name, empId, e.deptName, manager, score from employee as e, dept, score", // 2
+ "select p.deptName, sum(score) from dept as p, score group by p.deptName having sum(score) > 30", // 3
+ "select p.deptName, score from dept as p, score order by score asc", // 4
+ "select name from employee where empId = 100", // 5
+ "select deptName, class, score from score", // 6
+ "select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 7
+ "select count(*), max(score), min(score) from score", // 8
+ "select count(deptName) from score", // 9
+ "select managerId, empId, deptName from employee order by managerId, empId desc", // 10
+ "select deptName, nullable from score group by deptName, nullable", // 11
+ "select 3 < 4 as ineq, 3.5 * 2 as score", // 12
+ "select (1 > 0) and 3 > 1", // 13
+ "select sum(score), max(score), min(score) from score", // 14
+ "select deptname, sum(score), max(score), min(score) from score group by deptname", // 15
+ "select name from employee where empid >= 0", // 16
+ };
+
+ @Test
+ public final void testCreateScanPlan() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateScanPlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+ LogicalNode rootNode =plan.getRootBlock().getRoot();
+ optimizer.optimize(plan);
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ Tuple tuple;
+ int i = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ assertTrue(tuple.contains(0));
+ assertTrue(tuple.contains(1));
+ assertTrue(tuple.contains(2));
+ i++;
+ }
+ exec.close();
+ assertEquals(100, i);
+ }
+
+ @Test
+ public final void testCreateScanWithFilterPlan() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateScanWithFilterPlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[16]);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+ LogicalNode rootNode =plan.getRootBlock().getRoot();
+ optimizer.optimize(plan);
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ Tuple tuple;
+ int i = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ assertTrue(tuple.contains(0));
+ i++;
+ }
+ exec.close();
+ assertEquals(100, i);
+ }
+
+ @Test
+ public final void testGroupByPlan() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByPlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[7]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ optimizer.optimize(plan);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ int i = 0;
+ Tuple tuple;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ assertEquals(6, tuple.getInt4(2)); // sum
+ assertEquals(3, tuple.getInt4(3)); // max
+ assertEquals(1, tuple.getInt4(4)); // min
+ i++;
+ }
+ exec.close();
+ assertEquals(10, i);
+ }
+
+ @Test
+ public final void testHashGroupByPlanWithALLField() throws IOException, TajoException {
+ // TODO - currently, this query does not use hash-based group operator.
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY +
+ "/testHashGroupByPlanWithALLField");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[15]);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ int i = 0;
+ Tuple tuple;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ assertEquals(12, tuple.getInt4(1)); // sum
+ assertEquals(3, tuple.getInt4(2)); // max
+ assertEquals(1, tuple.getInt4(3)); // min
+ i++;
+ }
+ exec.close();
+ assertEquals(5, i);
+ }
+
+ @Test
+ public final void testSortGroupByPlan() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortGroupByPlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[]{frags[0]}, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[7]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan.getRootBlock().getRoot());
+
+ /*HashAggregateExec hashAgg = (HashAggregateExec) exec;
+
+ SeqScanExec scan = (SeqScanExec) hashAgg.getSubOp();
+
+ Column [] grpColumns = hashAgg.getAnnotation().getGroupingColumns();
+ QueryBlock.SortSpec [] specs = new QueryBlock.SortSpec[grpColumns.length];
+ for (int i = 0; i < grpColumns.length; i++) {
+ specs[i] = new QueryBlock.SortSpec(grpColumns[i], true, false);
+ }
+ SortNode annotation = new SortNode(specs);
+ annotation.setInSchema(scan.getSchema());
+ annotation.setOutSchema(scan.getSchema());
+ SortExec sort = new SortExec(annotation, scan);
+ exec = new SortAggregateExec(hashAgg.getAnnotation(), sort);*/
+
+ int i = 0;
+ Tuple tuple;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ assertEquals(6, tuple.getInt4(2)); // sum
+ assertEquals(3, tuple.getInt4(3)); // max
+ assertEquals(1, tuple.getInt4(4)); // min
+ i++;
+ }
+ assertEquals(10, i);
+
+ exec.rescan();
+ i = 0;
+ while ((tuple = exec.next()) != null) {
+ assertEquals(6, tuple.getInt4(2)); // sum
+ assertEquals(3, tuple.getInt4(3)); // max
+ assertEquals(1, tuple.getInt4(4)); // min
+ i++;
+ }
+ exec.close();
+ assertEquals(10, i);
+ }
+
+ private String[] CreateTableAsStmts = {
+ "create table grouped1 as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 0
+ "create table grouped2 using rcfile as select deptName, class, sum(score), max(score), min(score) from score group by deptName, class", // 1
+ "create table grouped3 partition by column (dept text, class text) as select sum(score), max(score), min(score), deptName, class from score group by deptName, class", // 2,
+ "create table score_large_output as select * from score_large", // 4
+ "CREATE TABLE score_part (deptname text, score int4, nullable text) PARTITION BY COLUMN (class text) " +
+ "AS SELECT deptname, score, nullable, class from score_large" // 5
+ };
+
+ @Test
+ public final void testStorePlan() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ ctx.setOutputPath(new Path(workDir, "grouped1"));
+
+ Expr context = analyzer.parse(CreateTableAsStmts[0]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ TableMeta outputMeta = CatalogUtil.newTableMeta("TEXT");
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
+ scanner.init();
+ Tuple tuple;
+ int i = 0;
+ while ((tuple = scanner.next()) != null) {
+ assertEquals(6, tuple.getInt4(2)); // sum
+ assertEquals(3, tuple.getInt4(3)); // max
+ assertEquals(1, tuple.getInt4(4)); // min
+ i++;
+ }
+ assertEquals(10, i);
+ scanner.close();
+
+ // Examine the statistics information
+ assertEquals(10, ctx.getResultStats().getNumRows().longValue());
+ }
+
+ @Test
+ public final void testStorePlanWithMaxOutputFileSize() throws IOException, TajoException,
+ CloneNotSupportedException {
+
+ TableStats stats = largeScore.getStats();
+ assertTrue("Checking meaningfulness of test", stats.getNumBytes() > StorageUnit.MB);
+
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score_large", largeScore.getMeta(),
+ new Path(largeScore.getUri()), Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlanWithMaxOutputFileSize");
+
+ QueryContext queryContext = new QueryContext(conf, session);
+ queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1);
+
+ TaskAttemptContext ctx = new TaskAttemptContext(
+ queryContext,
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ ctx.setOutputPath(new Path(workDir, "maxOutput"));
+
+ Expr context = analyzer.parse(CreateTableAsStmts[3]);
+
+ LogicalPlan plan = planner.createPlan(queryContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ // executing StoreTableExec
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ // checking the number of punctuated files
+ int expectedFileNum = (int) (stats.getNumBytes() / (float) StorageUnit.MB);
+ FileSystem fs = ctx.getOutputPath().getFileSystem(conf);
+ FileStatus [] statuses = fs.listStatus(ctx.getOutputPath().getParent());
+ assertEquals(expectedFileNum, statuses.length);
+
+ // checking the file contents
+ long totalNum = 0;
+ for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) {
+ Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner(
+ CatalogUtil.newTableMeta("TEXT"),
+ rootNode.getOutSchema(),
+ status.getPath());
+
+ scanner.init();
+ while ((scanner.next()) != null) {
+ totalNum++;
+ }
+ scanner.close();
+ }
+ assertTrue(totalNum == ctx.getResultStats().getNumRows());
+ }
+
+ @Test
+ public final void testStorePlanWithRCFile() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlanWithRCFile");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ ctx.setOutputPath(new Path(workDir, "grouped2"));
+
+ Expr context = analyzer.parse(CreateTableAsStmts[1]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ TableMeta outputMeta = CatalogUtil.newTableMeta("RCFILE");
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner(
+ outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
+ scanner.init();
+ Tuple tuple;
+ int i = 0;
+ while ((tuple = scanner.next()) != null) {
+ assertEquals(6, tuple.getInt4(2)); // sum
+ assertEquals(3, tuple.getInt4(3)); // max
+ assertEquals(1, tuple.getInt4(4)); // min
+ i++;
+ }
+ assertEquals(10, i);
+ scanner.close();
+
+ // Examine the statistics information
+ assertEquals(10, ctx.getResultStats().getNumRows().longValue());
+ }
+
+ @Test
+ public final void testEnforceForDefaultColumnPartitionStorePlan() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ ctx.setOutputPath(new Path(workDir, "grouped3"));
+
+ Expr context = analyzer.parse(CreateTableAsStmts[2]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ assertTrue(exec instanceof SortBasedColPartitionStoreExec);
+ }
+
+ @Test
+ public final void testEnforceForHashBasedColumnPartitionStorePlan() throws IOException, TajoException {
+
+ Expr context = analyzer.parse(CreateTableAsStmts[2]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalRootNode rootNode = (LogicalRootNode) optimizer.optimize(plan);
+ CreateTableNode createTableNode = rootNode.getChild();
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.HASH_PARTITION);
+
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(enforcer);
+ ctx.setOutputPath(new Path(workDir, "grouped4"));
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ assertTrue(exec instanceof HashBasedColPartitionStoreExec);
+ }
+
+ @Test
+ public final void testEnforceForSortBasedColumnPartitionStorePlan() throws IOException, TajoException {
+
+ Expr context = analyzer.parse(CreateTableAsStmts[2]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalRootNode rootNode = (LogicalRootNode) optimizer.optimize(plan);
+ CreateTableNode createTableNode = rootNode.getChild();
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.SORT_PARTITION);
+
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(enforcer);
+ ctx.setOutputPath(new Path(workDir, "grouped5"));
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ assertTrue(exec instanceof SortBasedColPartitionStoreExec);
+ }
+
+ @Test
+ public final void testPartitionedStorePlan() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan);
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), id, new FileFragment[] { frags[0] },
+ CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testPartitionedStorePlan"));
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[7]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+
+ int numPartitions = 3;
+ Column key1 = new Column("default.score.deptname", Type.TEXT);
+ Column key2 = new Column("default.score.class", Type.TEXT);
+ DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
+ ShuffleType.HASH_SHUFFLE, numPartitions);
+ dataChannel.setShuffleKeys(new Column[]{key1, key2});
+ ctx.setDataChannel(dataChannel);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
+
+ FileSystem fs = sm.getFileSystem();
+ QueryId queryId = id.getTaskId().getExecutionBlockId().getQueryId();
+ ExecutionBlockId ebId = id.getTaskId().getExecutionBlockId();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+ ctx.getHashShuffleAppenderManager().close(ebId);
+
+ String executionBlockBaseDir = queryId.toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle";
+ Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir);
+ FileStatus [] list = fs.listStatus(queryLocalTmpDir);
+
+ List<Fragment> fragments = new ArrayList<Fragment>();
+ for (FileStatus status : list) {
+ assertTrue(status.isDirectory());
+ FileStatus [] files = fs.listStatus(status.getPath());
+ for (FileStatus eachFile: files) {
+ fragments.add(new FileFragment("partition", eachFile.getPath(), 0, eachFile.getLen()));
+ }
+ }
+
+ assertEquals(numPartitions, fragments.size());
+ Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
+ scanner.init();
+
+ Tuple tuple;
+ int i = 0;
+ while ((tuple = scanner.next()) != null) {
+ assertEquals(6, tuple.getInt4(2)); // sum
+ assertEquals(3, tuple.getInt4(3)); // max
+ assertEquals(1, tuple.getInt4(4)); // min
+ i++;
+ }
+ assertEquals(10, i);
+ scanner.close();
+
+ // Examine the statistics information
+ assertEquals(10, ctx.getResultStats().getNumRows().longValue());
+
+ fs.delete(queryLocalTmpDir, true);
+ }
+
+ @Test
+ public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, TajoException {
+
+ // Preparing working dir and input fragments
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score_large", largeScore.getMeta(),
+ new Path(largeScore.getUri()), Integer.MAX_VALUE);
+ TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testPartitionedStorePlanWithMaxFileSize");
+
+ // Setting session variables
+ QueryContext queryContext = new QueryContext(conf, session);
+ queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1);
+
+ // Preparing task context
+ TaskAttemptContext ctx = new TaskAttemptContext(queryContext, id, new FileFragment[] { frags[0] }, workDir);
+ ctx.setOutputPath(new Path(workDir, "part-01-000000"));
+ // SortBasedColumnPartitionStoreExec will be chosen by default.
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(CreateTableAsStmts[4]);
+ LogicalPlan plan = planner.createPlan(queryContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ // Executing CREATE TABLE PARTITION BY
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ FileSystem fs = sm.getFileSystem();
+ FileStatus [] list = fs.listStatus(workDir);
+ // checking the number of partitions
+ assertEquals(2, list.length);
+
+ List<Fragment> fragments = Lists.newArrayList();
+ int i = 0;
+ for (FileStatus status : list) {
+ assertTrue(status.isDirectory());
+
+ long fileVolumSum = 0;
+ FileStatus [] fileStatuses = fs.listStatus(status.getPath());
+ for (FileStatus fileStatus : fileStatuses) {
+ fileVolumSum += fileStatus.getLen();
+ fragments.add(new FileFragment("partition", fileStatus.getPath(), 0, fileStatus.getLen()));
+ }
+ assertTrue("checking the meaningfulness of test", fileVolumSum > StorageUnit.MB && fileStatuses.length > 1);
+
+ long expectedFileNum = (long) Math.ceil(fileVolumSum / (float)StorageUnit.MB);
+ assertEquals(expectedFileNum, fileStatuses.length);
+ }
+ TableMeta outputMeta = CatalogUtil.newTableMeta("TEXT");
+ Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
+ scanner.init();
+
+ long rowNum = 0;
+ while (scanner.next() != null) {
+ rowNum++;
+ }
+
+ // checking the number of all written rows
+ assertTrue(largeScore.getStats().getNumRows() == rowNum);
+
+ scanner.close();
+ }
+
+ @Test
+ public final void testPartitionedStorePlanWithEmptyGroupingSet()
+ throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan);
+
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY +
+ "/testPartitionedStorePlanWithEmptyGroupingSet");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ id, new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[14]);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+ int numPartitions = 1;
+ DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
+ ShuffleType.HASH_SHUFFLE, numPartitions);
+ dataChannel.setShuffleKeys(new Column[]{});
+ ctx.setDataChannel(dataChannel);
+ optimizer.optimize(plan);
+
+ TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
+
+ FileSystem fs = sm.getFileSystem();
+ QueryId queryId = id.getTaskId().getExecutionBlockId().getQueryId();
+ ExecutionBlockId ebId = id.getTaskId().getExecutionBlockId();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+ ctx.getHashShuffleAppenderManager().close(ebId);
+
+ String executionBlockBaseDir = queryId.toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle";
+ Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir);
+ FileStatus [] list = fs.listStatus(queryLocalTmpDir);
+
+ List<Fragment> fragments = new ArrayList<Fragment>();
+ for (FileStatus status : list) {
+ assertTrue(status.isDirectory());
+ FileStatus [] files = fs.listStatus(status.getPath());
+ for (FileStatus eachFile: files) {
+ fragments.add(new FileFragment("partition", eachFile.getPath(), 0, eachFile.getLen()));
+ }
+ }
+
+ assertEquals(numPartitions, fragments.size());
+
+ Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
+ scanner.init();
+ Tuple tuple;
+ int i = 0;
+ while ((tuple = scanner.next()) != null) {
+ assertEquals(60, tuple.getInt4(0)); // sum
+ assertEquals(3, tuple.getInt4(1)); // max
+ assertEquals(1, tuple.getInt4(2)); // min
+ i++;
+ }
+ assertEquals(1, i);
+ scanner.close();
+
+ // Examine the statistics information
+ assertEquals(1, ctx.getResultStats().getNumRows().longValue());
+ fs.delete(queryLocalTmpDir, true);
+ }
+
+ @Test
+ public final void testAggregationFunction() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testAggregationFunction");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[8]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ // Set all aggregation functions to the first phase mode
+ GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+ for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+ function.setFirstPhase();
+ }
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ exec.init();
+ Tuple tuple = exec.next();
+ assertEquals(30, tuple.getInt8(0));
+ assertEquals(3, tuple.getInt4(1));
+ assertEquals(1, tuple.getInt4(2));
+ assertNull(exec.next());
+ exec.close();
+ }
+
+ @Test
+ public final void testCountFunction() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCountFunction");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[9]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ // Set all aggregation functions to the first phase mode
+ GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+ for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+ function.setFirstPhase();
+ }
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ Tuple tuple = exec.next();
+ assertEquals(30, tuple.getInt8(0));
+ assertNull(exec.next());
+ exec.close();
+ }
+
+ @Test
+ public final void testGroupByWithNullValue() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByWithNullValue");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[11]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ int count = 0;
+ exec.init();
+ while(exec.next() != null) {
+ count++;
+ }
+ exec.close();
+ assertEquals(10, count);
+ }
+
+ @Test
+ public final void testUnionPlan() throws IOException, TajoException, CloneNotSupportedException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testUnionPlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+ LogicalRootNode root = (LogicalRootNode) rootNode;
+ UnionNode union = plan.createNode(UnionNode.class);
+ union.setLeftChild((LogicalNode) root.getChild().clone());
+ union.setRightChild((LogicalNode) root.getChild().clone());
+ root.setChild(union);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, root);
+
+ int count = 0;
+ exec.init();
+ while(exec.next() != null) {
+ count++;
+ }
+ exec.close();
+ assertEquals(200, count);
+ }
+
+ @Test
+ public final void testEvalExpr() throws IOException, TajoException {
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testEvalExpr");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] { }, workDir);
+ Expr expr = analyzer.parse(QUERIES[12]);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ Tuple tuple;
+ exec.init();
+ tuple = exec.next();
+ exec.close();
+ assertEquals(true, tuple.getBool(0));
+ assertTrue(7.0d == tuple.getFloat8(1));
+
+ expr = analyzer.parse(QUERIES[13]);
+ plan = planner.createPlan(defaultContext, expr);
+ rootNode = optimizer.optimize(plan);
+
+ phyPlanner = new PhysicalPlannerImpl(conf);
+ exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ tuple = exec.next();
+ exec.close();
+ assertEquals(DatumFactory.createBool(true), tuple.asDatum(0));
+ }
+
+ public final String [] createIndexStmt = {
+ "create index idx_employee on employee using TWO_LEVEL_BIN_TREE (name null first, empId desc)"
+ };
+
+ @Test
+ public final void testCreateIndex() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateIndex");
+ Path indexPath = StorageUtil.concatPath(TajoConf.getWarehouseDir(conf), "default/idx_employee");
+ if (sm.getFileSystem().exists(indexPath)) {
+ sm.getFileSystem().delete(indexPath, true);
+ }
+
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(createIndexStmt[0]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ while (exec.next() != null) {
+ }
+ exec.close();
+
+ FileStatus[] list = sm.getFileSystem().listStatus(indexPath);
+ assertEquals(2, list.length);
+ }
+
+ final static String [] duplicateElimination = {
+ "select distinct deptname from score",
+ };
+
+ @Test
+ public final void testDuplicateEliminate() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(),
+ new Path(score.getUri()), Integer.MAX_VALUE);
+
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testDuplicateEliminate");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(duplicateElimination[0]);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ Tuple tuple;
+
+ int cnt = 0;
+ Set<String> expected = Sets.newHashSet(
+ "name_1", "name_2", "name_3", "name_4", "name_5");
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ assertTrue(expected.contains(tuple.getText(0)));
+ cnt++;
+ }
+ exec.close();
+ assertEquals(5, cnt);
+ }
+
+ public String [] SORT_QUERY = {
+ "select name, empId from employee order by empId"
+ };
+
+ @Test
+ public final void testSortEnforcer() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortEnforcer");
+ Expr context = analyzer.parse(SORT_QUERY[0]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ optimizer.optimize(plan);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT);
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ assertTrue(exec instanceof MemSortExec);
+
+ context = analyzer.parse(SORT_QUERY[0]);
+ plan = planner.createPlan(defaultContext, context);
+ optimizer.optimize(plan);
+ rootNode = plan.getRootBlock().getRoot();
+
+ sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+
+ enforcer = new Enforcer();
+ enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.MERGE_SORT);
+ ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(enforcer);
+
+ phyPlanner = new PhysicalPlannerImpl(conf);
+ exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ assertTrue(exec instanceof ExternalSortExec);
+ }
+
+ @Test
+ public final void testGroupByEnforcer() throws IOException, TajoException {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getUri()),
+ Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByEnforcer");
+ Expr context = analyzer.parse(QUERIES[7]);
+ LogicalPlan plan = planner.createPlan(defaultContext, context);
+ optimizer.optimize(plan);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ GroupbyNode groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceHashAggregation(groupByNode.getPID());
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ assertNotNull(PhysicalPlanUtil.findExecutor(exec, HashAggregateExec.class));
+
+ context = analyzer.parse(QUERIES[7]);
+ plan = planner.createPlan(defaultContext, context);
+ optimizer.optimize(plan);
+ rootNode = plan.getRootBlock().getRoot();
+
+ groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+
+ enforcer = new Enforcer();
+ enforcer.enforceSortAggregation(groupByNode.getPID(), null);
+ ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
+ new FileFragment[] {frags[0]}, workDir);
+ ctx.setEnforcer(enforcer);
+
+ phyPlanner = new PhysicalPlannerImpl(conf);
+ exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ exec.next();
+ exec.close();
+
+ assertTrue(exec instanceof SortAggregateExec);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
new file mode 100644
index 0000000..d79d292
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static junit.framework.Assert.assertNotNull;
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestProgressExternalSortExec {
+ private TajoConf conf;
+ private TajoTestingCluster util;
+ private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestProgressExternalSortExec";
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private Path testDir;
+
+ private final int numTuple = 100000;
+ private Random rnd = new Random(System.currentTimeMillis());
+
+ private TableDesc employee;
+
+ private TableStats testDataStats;
+ @Before
+ public void setUp() throws Exception {
+ this.conf = new TajoConf();
+ util = new TajoTestingCluster();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
+
+ Schema schema = new Schema();
+ schema.addColumn("managerid", TajoDataTypes.Type.INT4);
+ schema.addColumn("empid", TajoDataTypes.Type.INT4);
+ schema.addColumn("deptname", TajoDataTypes.Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta("RAW");
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(employeeMeta, schema, employeePath);
+ appender.enableStats();
+ appender.init();
+ VTuple tuple = new VTuple(schema.size());
+ for (int i = 0; i < numTuple; i++) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(rnd.nextInt(50)),
+ DatumFactory.createInt4(rnd.nextInt(100)),
+ DatumFactory.createText("dept_" + i),
+ });
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+
+ testDataStats = appender.getStats();
+ employee = new TableDesc(
+ CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, employeeMeta,
+ employeePath.toUri());
+ catalog.createTable(employee);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ CommonTestingUtil.cleanupTestDir(TEST_PATH);
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ "select managerId, empId from employee order by managerId, empId"
+ };
+
+ @Test
+ public void testExternalSortExecProgressWithMemTableScanner() throws Exception {
+ testProgress(testDataStats.getNumBytes().intValue() * 20); //multiply 20 for memory fit
+ }
+
+ @Test
+ public void testExternalSortExecProgressWithPairWiseMerger() throws Exception {
+ testProgress(testDataStats.getNumBytes().intValue());
+ }
+
+ private void testProgress(int sortBufferBytesNum) throws Exception {
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+
+ // TODO - should be planed with user's optimization hint
+ if (!(proj.getChild() instanceof ExternalSortExec)) {
+ UnaryPhysicalExec sortExec = proj.getChild();
+ SeqScanExec scan = sortExec.getChild();
+
+ ExternalSortExec extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan);
+
+ extSort.setSortBufferBytesNum(sortBufferBytesNum);
+ proj.setChild(extSort);
+ } else {
+ ((ExternalSortExec)proj.getChild()).setSortBufferBytesNum(sortBufferBytesNum);
+ }
+
+ Tuple tuple;
+ Tuple preVal = null;
+ Tuple curVal;
+ int cnt = 0;
+ exec.init();
+ BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(),
+ new SortSpec[]{
+ new SortSpec(new Column("managerid", TajoDataTypes.Type.INT4)),
+ new SortSpec(new Column("empid", TajoDataTypes.Type.INT4))
+ });
+
+ float initProgress = 0.0f;
+ while ((tuple = exec.next()) != null) {
+ if (cnt == 0) {
+ initProgress = exec.getProgress();
+ assertTrue(initProgress > 0.5f && initProgress < 1.0f);
+ }
+
+ if (cnt == testDataStats.getNumRows() / 2) {
+ float progress = exec.getProgress();
+
+ assertTrue(progress > initProgress);
+ }
+ curVal = tuple;
+ if (preVal != null) {
+ assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+ }
+ preVal = curVal;
+ cnt++;
+ }
+
+ assertEquals(1.0f, exec.getProgress(), 0);
+ assertEquals(numTuple, cnt);
+
+ TableStats tableStats = exec.getInputStats();
+ assertNotNull(tableStats);
+ assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue());
+ assertEquals(cnt, testDataStats.getNumRows().longValue());
+ assertEquals(cnt, tableStats.getNumRows().longValue());
+ assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue());
+
+ // for rescan test
+ preVal = null;
+ exec.rescan();
+
+ cnt = 0;
+ while ((tuple = exec.next()) != null) {
+ curVal = tuple;
+ if (preVal != null) {
+ assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+ }
+ preVal = curVal;
+ cnt++;
+ }
+ assertEquals(1.0f, exec.getProgress(), 0);
+ assertEquals(numTuple, cnt);
+ exec.close();
+ assertEquals(1.0f, exec.getProgress(), 0);
+
+ tableStats = exec.getInputStats();
+ assertNotNull(tableStats);
+ assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue());
+ assertEquals(cnt, testDataStats.getNumRows().longValue());
+ assertEquals(cnt, tableStats.getNumRows().longValue());
+ assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
new file mode 100644
index 0000000..fe36602
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+
+// this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order
+public class TestRightOuterHashJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestRightOuterHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private Path testDir;
+ private QueryContext defaultContext;
+
+ private TableDesc dep3;
+ private TableDesc job3;
+ private TableDesc emp3;
+
+ private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+ private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+ private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf = util.getConfiguration();
+
+ //----------------- dep3 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ Schema dep3Schema = new Schema();
+ dep3Schema.addColumn("dep_id", Type.INT4);
+ dep3Schema.addColumn("dep_name", Type.TEXT);
+ dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep3Meta = CatalogUtil.newTableMeta("TEXT");
+ Path dep3Path = new Path(testDir, "dep3.csv");
+ Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(dep3Meta, dep3Schema, dep3Path);
+ appender1.init();
+ VTuple tuple = new VTuple(dep3Schema.size());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender1.addTuple(tuple);
+ }
+
+ appender1.flush();
+ appender1.close();
+ dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+ catalog.createTable(dep3);
+
+ //----------------- job3 ------------------------------
+ // job_id | job_title
+ // ----------------------
+ // 101 | job_101
+ // 102 | job_102
+ // 103 | job_103
+
+ Schema job3Schema = new Schema();
+ job3Schema.addColumn("job_id", Type.INT4);
+ job3Schema.addColumn("job_title", Type.TEXT);
+
+
+ TableMeta job3Meta = CatalogUtil.newTableMeta("TEXT");
+ Path job3Path = new Path(testDir, "job3.csv");
+ Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(job3Meta, job3Schema, job3Path);
+ appender2.init();
+ VTuple tuple2 = new VTuple(job3Schema.size());
+ for (int i = 1; i < 4; i++) {
+ int x = 100 + i;
+ tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+ DatumFactory.createText("job_" + x) });
+ appender2.addTuple(tuple2);
+ }
+
+ appender2.flush();
+ appender2.close();
+ job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+ catalog.createTable(job3);
+
+
+
+ //---------------------emp3 --------------------
+ // emp_id | first_name | last_name | dep_id | salary | job_id
+ // ------------------------------------------------------------
+ // 11 | fn_11 | ln_11 | 1 | 123 | 101
+ // 13 | fn_13 | ln_13 | 3 | 369 | 103
+ // 15 | fn_15 | ln_15 | 5 | 615 | null
+ // 17 | fn_17 | ln_17 | 7 | 861 | null
+ // 19 | fn_19 | ln_19 | 9 | 1107 | null
+ // 21 | fn_21 | ln_21 | 1 | 123 | 101
+ // 23 | fn_23 | ln_23 | 3 | 369 | 103
+
+ Schema emp3Schema = new Schema();
+ emp3Schema.addColumn("emp_id", Type.INT4);
+ emp3Schema.addColumn("first_name", Type.TEXT);
+ emp3Schema.addColumn("last_name", Type.TEXT);
+ emp3Schema.addColumn("dep_id", Type.INT4);
+ emp3Schema.addColumn("salary", Type.FLOAT4);
+ emp3Schema.addColumn("job_id", Type.INT4);
+
+
+ TableMeta emp3Meta = CatalogUtil.newTableMeta("TEXT");
+ Path emp3Path = new Path(testDir, "emp3.csv");
+ Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(emp3Meta, emp3Schema, emp3Path);
+ appender3.init();
+ VTuple tuple3 = new VTuple(emp3Schema.size());
+
+ for (int i = 1; i < 4; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+
+ int y = 20 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+ DatumFactory.createText("firstname_" + y),
+ DatumFactory.createText("lastname_" + y),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+ }
+
+ for (int i = 5; i < 10; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createNullDatum() });
+ appender3.addTuple(tuple3);
+ }
+
+ appender3.flush();
+ appender3.close();
+ emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+ catalog.createTable(emp3);
+
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+ defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ "select dep3.dep_id, dep_name, emp_id, salary from emp3 right outer join dep3 on dep3.dep_id = emp3.dep_id", //0 no nulls
+ "select job3.job_id, job_title, emp_id, salary from emp3 right outer join job3 on job3.job_id=emp3.job_id", //1 nulls on the left operand
+ "select job3.job_id, job_title, emp_id, salary from job3 right outer join emp3 on job3.job_id=emp3.job_id" //2 nulls on the right side
+ };
+
+ @Test
+ public final void testRightOuter_HashJoinExec0() throws IOException, TajoException {
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
+ Integer.MAX_VALUE);
+ FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getUri()),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestRightOuter_HashJoinExec0");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(12, count);
+ }
+ }
+
+
+ @Test
+ public final void testRightOuter_HashJoinExec1() throws IOException, TajoException {
+ FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()),
+ Integer.MAX_VALUE);
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestRightOuter_HashJoinExec1");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(5, count);
+ }
+ }
+
+ @Test
+ public final void testRightOuter_HashJoinExec2() throws IOException, TajoException {
+
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
+ Integer.MAX_VALUE);
+ FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestRightOuter_HashJoinExec2");
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[2]);
+ LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof RightOuterMergeJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+ }
+}