You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/16 13:06:11 UTC
[4/6] TAJO-907: Implement off-heap tuple block and zero-copy tuple.
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java
new file mode 100644
index 0000000..7bd6a70
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java
@@ -0,0 +1,128 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.annotation.NotNull;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+class MemTableScanner implements Scanner {
+ Iterable<Tuple> iterable;
+ Iterator<Tuple> iterator;
+ long inputBytes;
+
+ // for input stats
+ float scannerProgress;
+ int numRecords;
+ int totalRecords;
+ TableStats scannerTableStats;
+
+ public MemTableScanner(@NotNull Collection<Tuple> iterable, long inputBytes) {
+ Preconditions.checkNotNull(iterable);
+ this.iterable = iterable;
+ totalRecords = iterable.size();
+ this.inputBytes = inputBytes;
+ }
+
+ @Override
+ public void init() throws IOException {
+ scannerProgress = 0.0f;
+ numRecords = 0;
+
+ // it will be returned as the final stats
+ scannerTableStats = new TableStats();
+ scannerTableStats.setNumBytes(inputBytes);
+ scannerTableStats.setReadBytes(inputBytes);
+ scannerTableStats.setNumRows(totalRecords);
+
+ iterator = iterable.iterator();
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if (iterator.hasNext()) {
+ numRecords++;
+ return new VTuple(iterator.next());
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+
+ @Override
+ public void close() throws IOException {
+ scannerProgress = 1.0f;
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return null;
+ }
+
+ @Override
+ public float getProgress() {
+ if (numRecords > 0) {
+ return (float)numRecords / (float)totalRecords;
+
+ } else { // if an input is empty
+ return scannerProgress;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return scannerTableStats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index e1cc6a8..bf95d19 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -25,9 +25,9 @@ import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -52,7 +52,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
private List<Tuple> rightTupleSlots;
private JoinTupleComparator joincomparator = null;
- private TupleComparator[] tupleComparator = null;
+ private BaseTupleComparator[] tupleComparator = null;
private final static int INITIAL_TUPLE_SLOT = 10000;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index bbfe973..d68597d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -24,9 +24,9 @@ import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -53,7 +53,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
private Iterator<Tuple> innerIterator;
private JoinTupleComparator joincomparator = null;
- private TupleComparator[] tupleComparator = null;
+ private BaseTupleComparator[] tupleComparator = null;
private final static int INITIAL_TUPLE_SLOT = 10000;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java
new file mode 100644
index 0000000..2ac8662
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java
@@ -0,0 +1,250 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+/**
+ * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order.
+ */
+class PairWiseMerger implements Scanner {
+ private static final Log LOG = LogFactory.getLog(PairWiseMerger.class);
+
+ private Scanner leftScan;
+ private Scanner rightScan;
+
+ private VTuple outTuple;
+ private VTuple leftTuple;
+ private VTuple rightTuple;
+
+ private final Schema schema;
+ private final Comparator<Tuple> comparator;
+
+ private float mergerProgress;
+ private TableStats mergerInputStats;
+
+ private static enum State {
+ NEW,
+ INITED,
+ CLOSED
+ }
+
+ private State state = State.NEW;
+
+ public PairWiseMerger(Schema schema, Scanner leftScanner, Scanner rightScanner, Comparator<Tuple> comparator)
+ throws IOException {
+ this.schema = schema;
+ this.leftScan = leftScanner;
+ this.rightScan = rightScanner;
+ this.comparator = comparator;
+ }
+
+ private void setState(State state) {
+ this.state = state;
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (state == State.NEW) {
+ leftScan.init();
+ rightScan.init();
+
+ prepareTuplesForFirstComparison();
+
+ mergerInputStats = new TableStats();
+ mergerProgress = 0.0f;
+
+ setState(State.INITED);
+ } else {
+ throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name());
+ }
+ }
+
+ private void prepareTuplesForFirstComparison() throws IOException {
+ Tuple lt = leftScan.next();
+ if (lt != null) {
+ leftTuple = new VTuple(lt);
+ } else {
+ leftTuple = null; // TODO - missed free
+ }
+
+ Tuple rt = rightScan.next();
+ if (rt != null) {
+ rightTuple = new VTuple(rt);
+ } else {
+ rightTuple = null; // TODO - missed free
+ }
+ }
+
+ public Tuple next() throws IOException {
+
+ if (leftTuple != null && rightTuple != null) {
+ if (comparator.compare(leftTuple, rightTuple) < 0) {
+ outTuple = new VTuple(leftTuple);
+
+ Tuple lt = leftScan.next();
+ if (lt != null) {
+ leftTuple = new VTuple(lt);
+ } else {
+ leftTuple = null; // TODO - missed free
+ }
+ } else {
+ outTuple = new VTuple(rightTuple);
+
+ Tuple rt = rightScan.next();
+ if (rt != null) {
+ rightTuple = new VTuple(rt);
+ } else {
+ rightTuple = null; // TODO - missed free
+ }
+ }
+ return outTuple;
+ }
+
+ if (leftTuple == null) {
+ if (rightTuple != null) {
+ outTuple = new VTuple(rightTuple);
+ } else {
+ outTuple = null;
+ }
+
+ Tuple rt = rightScan.next();
+ if (rt != null) {
+ rightTuple = new VTuple(rt);
+ } else {
+ rightTuple = null; // TODO - missed free
+ }
+ } else {
+ if (leftTuple != null) {
+ outTuple = new VTuple(leftTuple);
+ } else {
+ outTuple = null;
+ }
+
+ Tuple lt = leftScan.next();
+ if (lt != null) {
+ leftTuple = new VTuple(lt);
+ } else {
+ leftTuple = null; // TODO - missed free
+ }
+ }
+ return outTuple;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (state == State.INITED) {
+ leftScan.reset();
+ rightScan.reset();
+
+ outTuple = null;
+ leftTuple = null;
+ rightTuple = null;
+
+ prepareTuplesForFirstComparison();
+ } else {
+ throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name());
+ }
+ }
+
+ public void close() throws IOException {
+ IOUtils.cleanup(PairWiseMerger.LOG, leftScan, rightScan);
+ getInputStats();
+ leftScan = null;
+ rightScan = null;
+ mergerProgress = 1.0f;
+ setState(State.CLOSED);
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public float getProgress() {
+ if (leftScan == null) {
+ return mergerProgress;
+ }
+ return leftScan.getProgress() * 0.5f + rightScan.getProgress() * 0.5f;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (leftScan == null) {
+ return mergerInputStats;
+ }
+ TableStats leftInputStats = leftScan.getInputStats();
+ if (mergerInputStats == null) {
+ mergerInputStats = new TableStats();
+ }
+ mergerInputStats.setNumBytes(0);
+ mergerInputStats.setReadBytes(0);
+ mergerInputStats.setNumRows(0);
+
+ if (leftInputStats != null) {
+ mergerInputStats.setNumBytes(leftInputStats.getNumBytes());
+ mergerInputStats.setReadBytes(leftInputStats.getReadBytes());
+ mergerInputStats.setNumRows(leftInputStats.getNumRows());
+ }
+
+ TableStats rightInputStats = rightScan.getInputStats();
+ if (rightInputStats != null) {
+ mergerInputStats.setNumBytes(mergerInputStats.getNumBytes() + rightInputStats.getNumBytes());
+ mergerInputStats.setReadBytes(mergerInputStats.getReadBytes() + rightInputStats.getReadBytes());
+ mergerInputStats.setNumRows(mergerInputStats.getNumRows() + rightInputStats.getNumRows());
+ }
+
+ return mergerInputStats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 68379d1..786c726 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -47,7 +47,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
private Schema keySchema;
private BSTIndex.BSTIndexWriter indexWriter;
- private TupleComparator comp;
+ private BaseTupleComparator comp;
private FileAppender appender;
private TableMeta meta;
@@ -71,7 +71,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
}
BSTIndex bst = new BSTIndex(new TajoConf());
- this.comp = new TupleComparator(keySchema, sortSpecs);
+ this.comp = new BaseTupleComparator(keySchema, sortSpecs);
Path storeTablePath = new Path(context.getWorkDir(), "output");
LOG.info("Output data directory: " + storeTablePath);
this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 5d4dad5..bc945de 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -26,9 +26,9 @@ import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -52,7 +52,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
private List<Tuple> innerTupleSlots;
private JoinTupleComparator joinComparator = null;
- private TupleComparator[] tupleComparator = null;
+ private BaseTupleComparator[] tupleComparator = null;
private final static int INITIAL_TUPLE_SLOT = 10000;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 2f0c12f..122d4f3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -34,7 +34,7 @@ import org.apache.tajo.engine.eval.FieldEval;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.engine.utils.TupleCache;
import org.apache.tajo.engine.utils.TupleCacheKey;
import org.apache.tajo.engine.utils.TupleUtil;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
index a4a8d37..e261e0c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
@@ -18,11 +18,12 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import java.io.IOException;
import java.util.Comparator;
@@ -35,7 +36,13 @@ public abstract class SortExec extends UnaryPhysicalExec {
Schema outSchema, PhysicalExec child, SortSpec [] sortSpecs) {
super(context, inSchema, outSchema, child);
this.sortSpecs = sortSpecs;
- this.comparator = new TupleComparator(inSchema, sortSpecs);
+
+ BaseTupleComparator comp = new BaseTupleComparator(inSchema, sortSpecs);
+ if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
+ this.comparator = context.getSharedResource().getCompiledComparator(inSchema, comp);
+ } else {
+ this.comparator = comp;
+ }
}
public SortSpec[] getSortSpecs() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index fd0c04f..3199b56 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -68,10 +68,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
}
PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, meta);
-
- if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) {
- maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB;
- }
+ maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB;
openNewFile(writtenFileNum);
sumStats = new TableStats();
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index 7aeed13..2945185 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -27,8 +27,8 @@ import org.apache.tajo.engine.eval.WindowFunctionEval;
import org.apache.tajo.engine.function.FunctionContext;
import org.apache.tajo.engine.planner.logical.WindowAggNode;
import org.apache.tajo.engine.planner.logical.WindowSpec;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -265,7 +265,7 @@ public class WindowAggExec extends UnaryPhysicalExec {
}
private void evaluationWindowFrame() {
- TupleComparator comp;
+ BaseTupleComparator comp;
evaluatedTuples = new ArrayList<Tuple>();
@@ -285,9 +285,9 @@ public class WindowAggExec extends UnaryPhysicalExec {
for (int idx = 0; idx < functions.length; idx++) {
if (orderedFuncFlags[idx]) {
- comp = new TupleComparator(inSchema, functions[idx].getSortSpecs());
+ comp = new BaseTupleComparator(inSchema, functions[idx].getSortSpecs());
Collections.sort(accumulatedInTuples, comp);
- comp = new TupleComparator(schemaForOrderBy, functions[idx].getSortSpecs());
+ comp = new BaseTupleComparator(schemaForOrderBy, functions[idx].getSortSpecs());
Collections.sort(evaluatedTuples, comp);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index ec5df04..7e26a22 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -29,7 +29,7 @@ import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.util.TUtil;
import java.util.*;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
deleted file mode 100644
index 981b572..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-
-public class SchemaUtil {
- // See TAJO-914 bug.
- //
- // Its essential problem is that constant value is evaluated multiple times at each scan.
- // As a result, join nodes can take the child nodes which have the same named fields.
- // Because current schema does not allow the same name and ignore the duplicated schema,
- // it finally causes the in-out schema mismatch between the parent and child nodes.
- //
- // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated constant values as different name fields.
- // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895.
- static int tmpColumnSeq = 0;
- public static Schema merge(Schema left, Schema right) {
- Schema merged = new Schema();
- for(Column col : left.getColumns()) {
- if (!merged.containsByQualifiedName(col.getQualifiedName())) {
- merged.addColumn(col);
- }
- }
- for(Column col : right.getColumns()) {
- if (merged.containsByQualifiedName(col.getQualifiedName())) {
- merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType());
- } else {
- merged.addColumn(col);
- }
- }
-
- // if overflow
- if (tmpColumnSeq < 0) {
- tmpColumnSeq = 0;
- }
- return merged;
- }
-
- /**
- * Get common columns to be used as join keys of natural joins.
- */
- public static Schema getNaturalJoinColumns(Schema left, Schema right) {
- Schema common = new Schema();
- for (Column outer : left.getColumns()) {
- if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) {
- common.addColumn(new Column(outer.getSimpleName(), outer.getDataType()));
- }
- }
-
- return common;
- }
-
- public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) {
- Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
- if (tableName != null) {
- logicalSchema.setQualifier(tableName);
- }
- return logicalSchema;
- }
-
- public static <T extends Schema> T clone(Schema schema) {
- try {
- T copy = (T) schema.clone();
- return copy;
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index e77e265..4b02777 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -30,6 +30,8 @@ import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.util.Pair;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -85,10 +87,10 @@ public class ExecutionBlockSharedResource {
}
public EvalNode compileEval(Schema schema, EvalNode eval) {
- return compilationContext.getCompiler().compile(schema, eval);
+ return compilationContext.getEvalCompiler().compile(schema, eval);
}
- public EvalNode getPreCompiledEval(Schema schema, EvalNode eval) {
+ public EvalNode getCompiledEval(Schema schema, EvalNode eval) {
if (codeGenEnabled) {
Pair<Schema, EvalNode> key = new Pair<Schema, EvalNode>(schema, eval);
@@ -96,7 +98,7 @@ public class ExecutionBlockSharedResource {
return compilationContext.getPrecompiedEvals().get(key);
} else {
try {
- LOG.warn(eval.toString() + " does not exists. Immediately compile it: " + eval);
+ LOG.warn(eval.toString() + " does not exist. Compiling it immediately.");
return compileEval(schema, eval);
} catch (Throwable t) {
LOG.warn(t);
@@ -104,10 +106,37 @@ public class ExecutionBlockSharedResource {
}
}
} else {
- throw new IllegalStateException("CodeGen is disabled");
+ throw new IllegalStateException("CODEGEN is disabled");
}
}
+ public TupleComparator compileComparator(Schema schema, BaseTupleComparator comp) {
+ return compilationContext.getComparatorCompiler().compile(comp, false);
+ }
+
+ public TupleComparator getCompiledComparator(Schema schema, BaseTupleComparator comp) {
+ if (codeGenEnabled) {
+ Pair<Schema, BaseTupleComparator> key = new Pair<Schema, BaseTupleComparator>(schema, comp);
+ if (compilationContext.getPrecompiedComparators().containsKey(key)) {
+ return compilationContext.getPrecompiedComparators().get(key);
+ } else {
+ try {
+ LOG.warn(comp + " does not exist. Compiling it immediately");
+ return compileComparator(schema, comp);
+ } catch (Throwable t) {
+ LOG.warn(t);
+ return comp;
+ }
+ }
+ } else {
+ throw new IllegalStateException("CODEGEN is disabled");
+ }
+ }
+
+ public TajoClassLoader getClassLoader() {
+ return classLoader;
+ }
+
public void release() {
compilationContext = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
index be33a12..2b58196 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.worker.dataserver.retriever.FileChunk;
import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
@@ -57,10 +57,10 @@ public class RangeRetrieverHandler implements RetrieverHandler {
private final File file;
private final BSTIndex.BSTIndexReader idxReader;
private final Schema schema;
- private final TupleComparator comp;
+ private final BaseTupleComparator comp;
private final RowStoreDecoder decoder;
- public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException {
+ public RangeRetrieverHandler(File outDir, Schema schema, BaseTupleComparator comp) throws IOException {
this.file = outDir;
BSTIndex index = new BSTIndex(new TajoConf());
this.schema = schema;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 7b4cbe1..d2cb60d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -49,8 +49,9 @@ import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.RawFile;
import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -94,7 +95,9 @@ public class Task {
// TODO - to be refactored
private ShuffleType shuffleType = null;
private Schema finalSchema = null;
- private TupleComparator sortComp = null;
+
+ private BaseTupleComparator sortComp = null;
+ private ClientSocketChannelFactory channelFactory = null;
static final String OUTPUT_FILE_PREFIX="part-";
static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
@@ -175,7 +178,7 @@ public class Task {
if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
- this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
+ this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
}
} else {
// The final result of a task will be written in a file named part-ss-nnnnnnn,
@@ -685,7 +688,7 @@ public class Task {
if (!storeDir.exists()) {
storeDir.mkdirs();
}
- storeFile = new File(storeDir, "in_" + i);
+ storeFile = new File(storeDir, "in_" + i + "." + RawFile.FILE_EXTENSION);
Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory);
runnerList.add(fetcher);
i++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 422ec2b..4973ff8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -133,6 +133,12 @@ public class TaskAttemptContext {
final Fragment [] fragments, final Path workDir) {
this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
}
+ @VisibleForTesting
+ public TaskAttemptContext(final QueryContext queryContext, final QueryUnitAttemptId queryId,
+ final Fragment [] fragments, final Path workDir, ExecutionBlockSharedResource resource) {
+ this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+ this.sharedResource = resource;
+ }
public TajoConf getConf() {
return queryContext.getConf();
@@ -167,13 +173,9 @@ public class TaskAttemptContext {
return sharedResource;
}
- public EvalNode compileEval(Schema schema, EvalNode eval) {
- return sharedResource.compileEval(schema, eval);
- }
-
public EvalNode getPrecompiledEval(Schema schema, EvalNode eval) {
if (sharedResource != null) {
- return sharedResource.getPreCompiledEval(schema, eval);
+ return sharedResource.getCompiledEval(schema, eval);
} else {
LOG.debug("Shared resource is not initialized. It is NORMAL in unit tests");
return eval;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java b/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java
new file mode 100644
index 0000000..6d9b135
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java
@@ -0,0 +1,352 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.codegen;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.tuple.offheap.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.tajo.common.TajoDataTypes.Type.*;
+import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.schema;
+import static org.junit.Assert.assertTrue;
+
+public class TestTupleComparerCompiler {
+ private static TajoClassLoader classLoader;
+ private static TupleComparerCompiler compiler;
+
+ @BeforeClass
+ public static void setUp() {
+ classLoader = new TajoClassLoader();
+ compiler = new TupleComparerCompiler(classLoader);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Throwable {
+ compiler = null;
+
+ classLoader.clean();
+ classLoader = null;
+ }
+
+ private SortSpec [][] createSortSpecs(String columnName) {
+ Column column = schema.getColumn(columnName);
+ SortSpec [][] sortSpecList = new SortSpec[4][];
+ sortSpecList[0] = new SortSpec[] {new SortSpec(column, true, false)};
+ sortSpecList[1] = new SortSpec[] {new SortSpec(column, true, true)};
+ sortSpecList[2] = new SortSpec[] {new SortSpec(column, false, false)};
+ sortSpecList[3] = new SortSpec[] {new SortSpec(column, false, true)};
+ return sortSpecList;
+ }
+
+ private TupleComparator [] createComparators(SortSpec [][] sortSpecs, boolean unsafeTuple) {
+ TupleComparator [] comps = new TupleComparator[sortSpecs.length];
+ for (int i = 0; i < sortSpecs.length; i++) {
+ BaseTupleComparator compImpl = new BaseTupleComparator(schema, sortSpecs[i]);
+ comps[i] = compiler.compile(compImpl, unsafeTuple);
+ }
+
+ return comps;
+ }
+
+ private void assertCompareAll(TupleComparator [] comps, SortSpec [][] sortSpecs, Tuple...tuples) {
+ Preconditions.checkArgument(comps.length == sortSpecs.length);
+ Preconditions.checkArgument(tuples.length == 5, "The number of tuples must be 5");
+
+ for (int i = 0; i < comps.length; i++) {
+ assertCompare(comps[i], sortSpecs[i], tuples);
+ }
+ }
+
+ /**
+ * First two tuples must be the same values for equality check.
+ * @param tuples
+ */
+ private void assertCompare(TupleComparator comp, SortSpec [] sortSpecs, Tuple...tuples) {
+ Preconditions.checkArgument(tuples.length == 5, "The number of tuples must be 5");
+
+ if (sortSpecs[0].isAscending()) {
+ assertTrue("Checking Equality", comp.compare(tuples[0], tuples[1]) == 0);
+ assertTrue("Checking Less Than", comp.compare(tuples[0], tuples[2]) < 0);
+ assertTrue("Checking Greater Than", comp.compare(tuples[2], tuples[0]) > 0);
+ } else {
+ assertTrue("Checking Equality", comp.compare(tuples[0], tuples[1]) == 0);
+ assertTrue("Checking Less Than", comp.compare(tuples[0], tuples[2]) > 0);
+ assertTrue("Checking Greater Than", comp.compare(tuples[2], tuples[0]) < 0);
+ }
+
+ if (sortSpecs[0].isNullFirst()) {
+ assertTrue("Checking Greater Than", comp.compare(tuples[0], tuples[3]) > 0);
+ assertTrue("Checking Greater Than", comp.compare(tuples[3], tuples[0]) < 0);
+ } else {
+ assertTrue("Checking Greater Than", comp.compare(tuples[0], tuples[3]) < 0);
+ assertTrue("Checking Greater Than", comp.compare(tuples[3], tuples[0]) > 0);
+ }
+
+ assertTrue("Checking Null Equality", comp.compare(tuples[3], tuples[4]) == 0);
+ assertTrue("Checking Null Equality", comp.compare(tuples[4], tuples[3]) == 0);
+ }
+
+ @Test
+ public void testCompareOneBool() throws Exception {
+ SortSpec [][] sortSpecs = createSortSpecs("col0");
+ TupleComparator [] comps = createComparators(sortSpecs, false);
+
+ Tuple t1 = new VTuple(schema.size());
+ t1.put(0, DatumFactory.createBool(true));
+
+ Tuple t2 = new VTuple(schema.size());
+ t2.put(0, DatumFactory.createBool(true));
+
+ Tuple t3 = new VTuple(schema.size());
+ t3.put(0, DatumFactory.createBool(false));
+
+ Tuple t4 = new VTuple(schema.size());
+ t4.put(0, NullDatum.get());
+
+ assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t4);
+ }
+
+ @Test
+ public void testCompareOneInt() throws Exception {
+ SortSpec [][] sortSpecs = createSortSpecs("col2");
+ TupleComparator [] comps = createComparators(sortSpecs, false);
+
+ Tuple t1 = new VTuple(schema.size());
+ t1.put(2, DatumFactory.createInt2((short) 1));
+
+ Tuple t2 = new VTuple(schema.size());
+ t2.put(2, DatumFactory.createInt2((short) 1));
+
+ Tuple t3 = new VTuple(schema.size());
+ t3.put(2, DatumFactory.createInt2((short) 2));
+
+ Tuple t4 = new VTuple(schema.size());
+ t4.put(2, NullDatum.get());
+
+ Tuple t5 = new VTuple(schema.size());
+ t5.put(2, NullDatum.get());
+
+ assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t5);
+ }
+
+ @Test
+ public void testCompareTwoInts() throws Exception {
+ SortSpec[] sortSpecs = new SortSpec[] {
+ new SortSpec(new Column("col2", INT2)),
+ new SortSpec(new Column("col3", INT4))};
+
+
+ BaseTupleComparator comparator = new BaseTupleComparator(schema, sortSpecs);
+
+ TajoClassLoader classLoader = new TajoClassLoader();
+
+ TupleComparerCompiler compiler = new TupleComparerCompiler(classLoader);
+ TupleComparator compiled = compiler.compile(comparator, false);
+
+ Tuple t1 = new VTuple(schema.size());
+ t1.put(2, DatumFactory.createInt2((short) 1));
+ t1.put(3, DatumFactory.createInt4(1));
+
+ Tuple t2 = new VTuple(schema.size());
+ t2.put(2, DatumFactory.createInt2((short) 1));
+ t2.put(3, DatumFactory.createInt4(1));
+
+ Tuple t3 = new VTuple(schema.size());
+ t3.put(2, DatumFactory.createInt2((short) 2));
+ t3.put(3, DatumFactory.createInt4(1));
+
+ Tuple t4 = new VTuple(schema.size());
+ t4.put(2, DatumFactory.createInt2((short) 1));
+ t4.put(3, DatumFactory.createInt4(2));
+
+ Tuple t5 = new VTuple(schema.size());
+ t5.put(2, NullDatum.get());
+ t5.put(3, DatumFactory.createInt4(2));
+
+ Tuple t6 = new VTuple(schema.size());
+ t6.put(2, DatumFactory.createInt2((short) 1));
+ t6.put(3, NullDatum.get());
+
+ assertCompare(compiled, sortSpecs, t1, t2, t3, t5, t5);
+ assertCompare(compiled, sortSpecs, t1, t2, t4, t5, t5);
+ assertCompare(compiled, sortSpecs, t1, t2, t5, t6, t6);
+ }
+
+ @Test
+ public void testCompareOneFloat4() throws Exception {
+ SortSpec [][] sortSpecs = createSortSpecs("col4");
+ TupleComparator comps [] = createComparators(sortSpecs, false);
+
+ Tuple t1 = new VTuple(schema.size());
+ t1.put(4, DatumFactory.createFloat4(1.0f));
+
+ Tuple t2 = new VTuple(schema.size());
+ t2.put(4, DatumFactory.createFloat4(1.0f));
+
+ Tuple t3 = new VTuple(schema.size());
+ t3.put(4, DatumFactory.createFloat4(2.0f));
+
+ Tuple t4 = new VTuple(schema.size());
+ t4.put(4, NullDatum.get());
+
+ assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t4);
+ }
+
+ @Test
+ public void testCompareFloat4Float8() throws Exception {
+ SortSpec[] sortSpecs = new SortSpec[] {
+ new SortSpec(new Column("col4", FLOAT4)),
+ new SortSpec(new Column("col5", FLOAT8))};
+
+ BaseTupleComparator comparator = new BaseTupleComparator(schema, sortSpecs);
+
+ TajoClassLoader classLoader = new TajoClassLoader();
+ TupleComparerCompiler compiler = new TupleComparerCompiler(classLoader);
+ TupleComparator compiled = compiler.compile(comparator, false);
+
+ Tuple t1 = new VTuple(schema.size());
+ t1.put(4, DatumFactory.createFloat4(1.0f));
+ t1.put(5, DatumFactory.createFloat8(1.0f));
+
+ Tuple t2 = new VTuple(schema.size());
+ t2.put(4, DatumFactory.createFloat4(1.0f));
+ t2.put(5, DatumFactory.createFloat8(1.0f));
+
+ Tuple t3 = new VTuple(schema.size());
+ t3.put(4, DatumFactory.createFloat4(1.0f));
+ t3.put(5, DatumFactory.createFloat8(2.0f));
+
+ Tuple t4 = new VTuple(schema.size());
+ t4.put(4, DatumFactory.createFloat4(2.0f));
+ t4.put(5, DatumFactory.createFloat8(1.0f));
+
+ Tuple t5 = new VTuple(schema.size());
+ t5.put(4, DatumFactory.createFloat4(2.0f));
+ t5.put(5, NullDatum.get());
+
+ Tuple t6 = new VTuple(schema.size());
+ t6.put(4, NullDatum.get());
+ t6.put(5, DatumFactory.createFloat8(1.0f));
+
+ assertCompare(compiled, sortSpecs, t1, t2, t3, t5, t5);
+ assertCompare(compiled, sortSpecs, t1, t2, t4, t5, t5);
+ assertCompare(compiled, sortSpecs, t1, t2, t5, t6, t6);
+ }
+
+ @Test
+ public void testCompareText() throws Exception {
+ SortSpec [][] sortSpecs = createSortSpecs("col6");
+ TupleComparator [] comps = createComparators(sortSpecs, false);
+
+ Tuple t1 = new VTuple(schema.size());
+ t1.put(6, DatumFactory.createText("tajo"));
+
+ Tuple t2 = new VTuple(schema.size());
+ t2.put(6, DatumFactory.createText("tajo"));
+
+ Tuple t3 = new VTuple(schema.size());
+ t3.put(6, DatumFactory.createText("tazo"));
+
+ Tuple t4 = new VTuple(schema.size());
+ t4.put(6, NullDatum.get());
+
+ assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t4);
+ }
+
+ @Test
+ public void testCompareTextWithNull() throws Exception {
+ SortSpec[] sortSpecs = new SortSpec[] {
+ new SortSpec(new Column("col5", FLOAT8)),
+ new SortSpec(new Column("col6", TEXT))};
+ BaseTupleComparator compImpl = new BaseTupleComparator(schema, sortSpecs);
+ TupleComparator comp = compiler.compile(compImpl, false);
+
+ Tuple t1 = new VTuple(schema.size());
+ t1.put(5, NullDatum.get());
+ t1.put(6, DatumFactory.createText("ARGENTINA"));
+
+ Tuple t2 = new VTuple(schema.size());
+ t2.put(5, NullDatum.get());
+ t2.put(6, DatumFactory.createText("ARGENTINA"));
+
+ Tuple t3 = new VTuple(schema.size());
+ t3.put(5, NullDatum.get());
+ t3.put(6, DatumFactory.createText("CANADA"));
+
+ Tuple t4 = new VTuple(schema.size());
+ t4.put(5, NullDatum.get());
+ t4.put(6, NullDatum.get());
+
+ assertCompare(comp, sortSpecs, t1, t2, t3, t4, t4);
+ }
+
+ private void fillTextColumnToRowBlock(OffHeapRowBlock rowBlock, String text) {
+ RowWriter writer = rowBlock.getWriter();
+ writer.startRow();
+ writer.skipField(); // 0
+ writer.skipField(); // 1
+ writer.skipField(); // 2
+ writer.skipField(); // 3
+ writer.skipField(); // 4
+ writer.skipField(); // 5
+ if (text != null) {
+ writer.putText(text);
+ }
+ writer.endRow();
+ }
+
+ @Test
+ public void testCompareTextInUnSafeTuple() throws Exception {
+ SortSpec [][] sortSpecs = createSortSpecs("col6");
+ TupleComparator [] comps = createComparators(sortSpecs, true);
+
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 640);
+ fillTextColumnToRowBlock(rowBlock, "tajo");
+ fillTextColumnToRowBlock(rowBlock, "tajo");
+ fillTextColumnToRowBlock(rowBlock, "tazo");
+ fillTextColumnToRowBlock(rowBlock, null);
+
+ List<UnSafeTuple> tuples = Lists.newArrayList();
+
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+ reader.reset();
+ ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+ while(reader.next(zcTuple)) {
+ tuples.add(zcTuple);
+ zcTuple = new ZeroCopyTuple();
+ }
+
+ assertCompareAll(comps, sortSpecs, tuples.get(0), tuples.get(1), tuples.get(2), tuples.get(3), tuples.get(3));
+ rowBlock.release();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 4cbdddd..9964a38 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -40,7 +40,7 @@ import org.apache.tajo.engine.plan.EvalTreeProtoSerializer;
import org.apache.tajo.engine.plan.proto.PlanProto;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.LazyTuple;
import org.apache.tajo.storage.Tuple;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index b370be7..7116905 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -31,8 +31,8 @@ import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.function.builtin.SumInt;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
@@ -287,7 +287,7 @@ public class TestPlannerUtil {
FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4));
EvalNode joinQual = new BinaryEval(EvalType.EQUAL, f1, f2);
- TupleComparator [] comparators = PlannerUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema);
+ BaseTupleComparator[] comparators = PlannerUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema);
Tuple t1 = new VTuple(2);
t1.put(0, DatumFactory.createInt4(1));
@@ -297,11 +297,11 @@ public class TestPlannerUtil {
t2.put(0, DatumFactory.createInt4(2));
t2.put(1, DatumFactory.createInt4(3));
- TupleComparator outerComparator = comparators[0];
+ BaseTupleComparator outerComparator = comparators[0];
assertTrue(outerComparator.compare(t1, t2) < 0);
assertTrue(outerComparator.compare(t2, t1) > 0);
- TupleComparator innerComparator = comparators[1];
+ BaseTupleComparator innerComparator = comparators[1];
assertTrue(innerComparator.compare(t1, t2) < 0);
assertTrue(innerComparator.compare(t2, t1) > 0);
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
index 2294424..12f8892 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -22,8 +22,8 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.VTuple;
import org.junit.Test;
@@ -483,7 +483,7 @@ public class TestUniformRangePartition {
TupleRange expected = new TupleRange(sortSpecs, s, e);
UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
- TupleComparator comp = new TupleComparator(schema, sortSpecs);
+ BaseTupleComparator comp = new BaseTupleComparator(schema, sortSpecs);
Tuple tuple = s;
Tuple prevTuple = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index f817776..5b4c696 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -68,7 +68,7 @@ public class TestBSTIndexExec {
private LogicalOptimizer optimizer;
private AbstractStorageManager sm;
private Schema idxSchema;
- private TupleComparator comp;
+ private BaseTupleComparator comp;
private BSTIndex.BSTIndexWriter writer;
private HashMap<Integer , Integer> randomValues ;
private int rndKey = -1;
@@ -104,7 +104,7 @@ public class TestBSTIndexExec {
idxSchema.addColumn("managerid", Type.INT4);
SortSpec[] sortKeys = new SortSpec[1];
sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false);
- this.comp = new TupleComparator(idxSchema, sortKeys);
+ this.comp = new BaseTupleComparator(idxSchema, sortKeys);
this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp);
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index e7aac3c..eb02bfc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -18,8 +18,13 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.algebra.Expr;
@@ -27,29 +32,38 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.storage.*;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlockUtils;
+import org.apache.tajo.tuple.offheap.TestOffHeapRowBlock;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.raw.TestDirectRawFile;
+import org.apache.tajo.storage.rawfile.DirectRawFileScanner;
+import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.worker.ExecutionBlockSharedResource;
import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.util.Random;
+import java.util.List;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.schema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestExternalSortExec {
+ private static final Log LOG = LogFactory.getLog(TestExternalSortExec.class);
+
private TajoConf conf;
private TajoTestingCluster util;
private final String TEST_PATH = "target/test-data/TestExternalSortExec";
@@ -60,8 +74,6 @@ public class TestExternalSortExec {
private Path testDir;
private final int numTuple = 100000;
- private Random rnd = new Random(System.currentTimeMillis());
-
private TableDesc employee;
@Before
@@ -75,30 +87,22 @@ public class TestExternalSortExec {
conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
sm = StorageManagerFactory.getStorageManager(conf, testDir);
- Schema schema = new Schema();
- schema.addColumn("managerid", Type.INT4);
- schema.addColumn("empid", Type.INT4);
- schema.addColumn("deptname", Type.TEXT);
-
- TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
- Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
- appender.enableStats();
- appender.init();
- Tuple tuple = new VTuple(schema.size());
- for (int i = 0; i < numTuple; i++) {
- tuple.put(new Datum[] {
- DatumFactory.createInt4(rnd.nextInt(50)),
- DatumFactory.createInt4(rnd.nextInt(100)),
- DatumFactory.createText("dept_" + i),
- });
- appender.addTuple(tuple);
- }
- appender.flush();
- appender.close();
+ OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(numTuple);
+ TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.DIRECTRAW);
+
+ Path outFile = new Path(TEST_PATH, "output.draw");
+
+ long startTime = System.currentTimeMillis();
+ Path employeePath = TestDirectRawFile.writeRowBlock(conf, employeeMeta, rowBlock, outFile);
+ long endTime = System.currentTimeMillis();
+ rowBlock.release();
- System.out.println(appender.getStats().getNumRows() + " rows (" + (appender.getStats().getNumBytes() / 1048576) +
- " MB)");
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileStatus status = fs.getFileStatus(employeePath);
+ LOG.info("============================================================");
+ LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false) + " " +
+ (endTime - startTime) + " msec");
+ LOG.info("============================================================");
employee = new TableDesc("default.employee", schema, employeeMeta, employeePath);
catalog.createTable(employee);
@@ -113,35 +117,31 @@ public class TestExternalSortExec {
}
String[] QUERIES = {
- "select managerId, empId from employee order by managerId, empId"
+ "select col2, col3, col4 from employee order by col2, col3"
};
@Test
- public final void testNext() throws IOException, PlanningException {
+ public final void testNext() throws IOException, PlanningException, InterruptedException {
+ QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
+ queryContext.setBool(SessionVars.CODEGEN, true);
+ queryContext.setLong(SessionVars.EXTSORT_BUFFER_SIZE, 100);
+
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(queryContext, expr);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource();
+ resource.initialize(queryContext, rootNode.toJson());
+
FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
- TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
- LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
- ctx.setEnforcer(new Enforcer());
- Expr expr = analyzer.parse(QUERIES[0]);
- LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
- LogicalNode rootNode = plan.getRootBlock().getRoot();
+ TaskAttemptContext taskContext = new TaskAttemptContext(queryContext,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir, resource);
+ taskContext.setEnforcer(new Enforcer());
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
- ProjectionExec proj = (ProjectionExec) exec;
-
- // TODO - should be planed with user's optimization hint
- if (!(proj.getChild() instanceof ExternalSortExec)) {
- UnaryPhysicalExec sortExec = proj.getChild();
- SeqScanExec scan = sortExec.getChild();
-
- ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
- ((MemSortExec)sortExec).getPlan(), scan);
- proj.setChild(extSort);
- }
+ PhysicalExec exec = phyPlanner.createPlan(taskContext, rootNode);
Tuple tuple;
Tuple preVal = null;
@@ -149,10 +149,10 @@ public class TestExternalSortExec {
int cnt = 0;
exec.init();
long start = System.currentTimeMillis();
- TupleComparator comparator = new TupleComparator(proj.getSchema(),
+ BaseTupleComparator comparator = new BaseTupleComparator(exec.getSchema(),
new SortSpec[]{
- new SortSpec(new Column("managerid", Type.INT4)),
- new SortSpec(new Column("empid", Type.INT4))
+ new SortSpec(new Column("col2", Type.INT4)),
+ new SortSpec(new Column("col3", Type.INT8))
});
while ((tuple = exec.next()) != null) {
@@ -180,6 +180,26 @@ public class TestExternalSortExec {
}
assertEquals(numTuple, cnt);
exec.close();
- System.out.println("Sort Time: " + (end - start) + " msc");
+ System.out.println("Sort and final write time: " + (end - start) + " msc");
+ }
+
+ public static DirectRawFileScanner createSortedScanner(TajoConf conf, TableMeta meta, int rowNum,
+ BaseTupleComparator comparator)
+ throws IOException {
+ Path testDir = CommonTestingUtil.getTestDir();
+ Path outFile = new Path(testDir, "file1.out");
+
+ OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum);
+
+ List<Tuple> tupleList = OffHeapRowBlockUtils.sort(rowBlock, comparator);
+
+ DirectRawFileWriter writer1 = new DirectRawFileWriter(conf, schema, meta, outFile);
+ writer1.init();
+ for (Tuple t:tupleList) {
+ writer1.addTuple(t);
+ }
+ writer1.close();
+
+ return new DirectRawFileScanner(conf, schema, meta, outFile);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java
new file mode 100644
index 0000000..4f87513
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java
@@ -0,0 +1,316 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.TestOffHeapRowBlock;
+import org.apache.tajo.storage.rawfile.DirectRawFileScanner;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.schema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestPairWiseMerger {
+ static TajoConf conf;
+ private static BaseTupleComparator comparator;
+ static {
+ conf = new TajoConf();
+
+ comparator = new BaseTupleComparator(TestOffHeapRowBlock.schema,
+ new SortSpec[] {
+ new SortSpec(new Column("col2", TajoDataTypes.Type.INT4)),
+ new SortSpec(new Column("col3", TajoDataTypes.Type.INT8))
+ });
+ }
+
+ @Test
+ public void testPairWiseMergerWithTwoLeaves() throws IOException {
+ int [] rowNums = new int [] {500, 1000};
+
+ Scanner [] scanners = createScanners(rowNums);
+
+ PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+
+ merger = createRightDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+ }
+
+ @Test
+ public void testPairWiseMergerWithThreeLeaves1() throws IOException {
+ int [] rowNums = new int[] {1, 1, 1};
+ Scanner [] scanners = createScanners(rowNums);
+
+ PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+
+ merger = createRightDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+ }
+
+ @Test
+ public void testPairWiseMergerWithThreeLeaves2() throws IOException {
+ int [] rowNums = new int[] {0, 0, 1};
+ Scanner [] scanners = createScanners(rowNums);
+
+ PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+
+ merger = createRightDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+ }
+
+ @Test
+ public void testPairWiseMergerWithThreeLeaves3() throws IOException {
+ int [] rowNums = new int[] {0, 1, 0};
+ Scanner [] scanners = createScanners(rowNums);
+
+ PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+
+ merger = createRightDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+ }
+
+ @Test
+ public void testPairWiseMergerWithThreeLeaves4() throws IOException {
+ int [] rowNums = new int[] {1, 0, 0};
+ Scanner [] scanners = createScanners(rowNums);
+
+ PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+
+ merger = createRightDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+ }
+
+ @Test
+ public void testPairWiseMergerWithThreeLeaves5() throws IOException {
+ int [] rowNums = new int[] {1, 0, 1};
+ Scanner [] scanners = createScanners(rowNums);
+
+ PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+
+ merger = createRightDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+ }
+
+ @Test
+ public void testPairWiseMergerWithThreeLeaves6() throws IOException {
+ int [] rowNums = new int[] {1, 1, 0};
+
+ Scanner [] scanners = createScanners(rowNums);
+
+ PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+
+ merger = createRightDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+ }
+
+ @Test
+ public void testPairWiseMergerWithThreeLeaves7() throws IOException {
+ int [] rowNums = new int[] {1, 0, 0};
+
+ Scanner [] scanners = createScanners(rowNums);
+
+ PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+
+ merger = createRightDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+ }
+
+ @Test
+ public void testThreeLevelPairWiseMerger1() throws IOException {
+ int [] rowNums = new int[] {500, 501, 499, 498, 489, 450, 431, 429};
+
+ Scanner [] scanners = createScanners(rowNums);
+
+ PairWiseMerger merger = createLeftDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+
+ merger = createRightDeepMerger(scanners);
+
+ merger.init();
+ assertSortResult(rowNums, merger, comparator);
+ merger.reset();
+ assertSortResult(rowNums, merger, comparator);
+ merger.close();
+ }
+
+ private static PairWiseMerger createLeftDeepMerger(Scanner [] scanners) throws IOException {
+ PairWiseMerger prev = null;
+ for (int i = 1; i < scanners.length; i++) {
+
+ if (i == 1) {
+ prev = new PairWiseMerger(schema, scanners[i - 1], scanners[i], comparator); // initial one
+ } else {
+ prev = new PairWiseMerger(schema, prev, scanners[i], comparator);
+ }
+ }
+
+ return prev;
+ }
+
+ private static PairWiseMerger createRightDeepMerger(Scanner [] scanners) throws IOException {
+ PairWiseMerger prev = null;
+ for (int i = 1; i < scanners.length; i++) {
+
+ if (i == 1) {
+ prev = new PairWiseMerger(schema, scanners[i - 1], scanners[i], comparator); // initial one
+ } else {
+ prev = new PairWiseMerger(schema, scanners[i], prev, comparator);
+ }
+ }
+
+ return prev;
+ }
+
+ private static Scanner [] createScanners(int [] rowNums) throws IOException {
+ DirectRawFileScanner[] scanners = new DirectRawFileScanner[rowNums.length];
+
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
+ for (int i = 0; i < rowNums.length; i++) {
+ scanners[i] = TestExternalSortExec.createSortedScanner(conf, meta, rowNums[i], comparator);
+ }
+
+ assertEquals(rowNums.length, scanners.length);
+ return scanners;
+ }
+
+ private static void assertSortResult(int[] rowNums, Scanner scanner, BaseTupleComparator comparator) throws IOException {
+ Tuple tuple;
+ Tuple curVal;
+ Tuple preVal = null;
+ int idx = 0;
+ while ((tuple = scanner.next()) != null) {
+ curVal = tuple;
+ if (preVal != null) {
+ assertTrue(idx + "th, prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+ }
+ preVal = curVal;
+ idx++;
+ }
+
+ int totalRowNum = 0;
+ for (int i = 0; i < rowNums.length; i++) {
+ totalRowNum += rowNums[i];
+ }
+ assertEquals(totalRowNum, idx);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 5d809f8..0987a78 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -1071,7 +1071,7 @@ public class TestPhysicalPlanner {
keySchema.addColumn("?empId", Type.INT4);
SortSpec[] sortSpec = new SortSpec[1];
sortSpec[0] = new SortSpec(keySchema.getColumn(0), true, false);
- TupleComparator comp = new TupleComparator(keySchema, sortSpec);
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpec);
BSTIndex bst = new BSTIndex(conf);
BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"),
keySchema, comp);
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index f649dac..8f130de 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -87,7 +87,7 @@ public class TestProgressExternalSortExec {
schema.addColumn("empid", TajoDataTypes.Type.INT4);
schema.addColumn("deptname", TajoDataTypes.Type.TEXT);
- TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.RAW);
+ TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
Path employeePath = new Path(testDir, "employee.csv");
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
appender.enableStats();
@@ -170,7 +170,7 @@ public class TestProgressExternalSortExec {
Tuple curVal;
int cnt = 0;
exec.init();
- TupleComparator comparator = new TupleComparator(proj.getSchema(),
+ BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(),
new SortSpec[]{
new SortSpec(new Column("managerid", TajoDataTypes.Type.INT4)),
new SortSpec(new Column("empid", TajoDataTypes.Type.INT4))
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 4d4cc3d..5610a60 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -83,7 +83,7 @@ public class TestSortExec {
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, tablePath);
appender.init();
Tuple tuple = new VTuple(schema.size());
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 100000; i++) {
tuple.put(new Datum[] {
DatumFactory.createInt4(rnd.nextInt(5)),
DatumFactory.createInt4(rnd.nextInt(10)),
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index cecb281..78cce64 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -112,7 +112,7 @@ public class TestTupleUtil {
sortSpecs);
TupleRange [] ranges = partitioner.partition(5);
assertTrue(5 <= ranges.length);
- TupleComparator comp = new TupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema));
+ BaseTupleComparator comp = new BaseTupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema));
TupleRange prev = ranges[0];
for (int i = 1; i < ranges.length; i++) {
assertTrue(comp.compare(prev.getStart(), ranges[i].getStart()) < 0);
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 5f8efe7..ccd2cde 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -174,7 +174,7 @@ public class TestRangeRetrieverHandler {
exec.close();
Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
- TupleComparator comp = new TupleComparator(keySchema, sortSpecs);
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
BSTIndex bst = new BSTIndex(conf);
BSTIndex.BSTIndexReader reader = bst.getIndexReader(
new Path(testDir, "output/index"), keySchema, comp);
@@ -298,7 +298,7 @@ public class TestRangeRetrieverHandler {
exec.close();
Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
- TupleComparator comp = new TupleComparator(keySchema, sortSpecs);
+ BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
BSTIndex bst = new BSTIndex(conf);
BSTIndex.BSTIndexReader reader = bst.getIndexReader(
new Path(testDir, "output/index"), keySchema, comp);
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index a0cbe9f..5a14795 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -132,6 +132,7 @@
run cp -r $ROOT/tajo-catalog/target/tajo-catalog-${project.version}/* .
run cp -r $ROOT/tajo-storage/target/tajo-storage-${project.version}/* .
run cp -r $ROOT/tajo-yarn-pullserver/target/tajo-yarn-pullserver-${project.version}.jar .
+ run cp -r $ROOT/tajo-thirdparty/asm/target/tajo-thirdparty-asm-${project.version}.jar .
run cp -r $ROOT/tajo-core/target/tajo-core-${project.version}.jar .
run cp -r $ROOT/tajo-core/target/lib .
run cp -r ${project.basedir}/src/main/bin .
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
index 5dae67e..6c8ef5d 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
@@ -17,6 +17,7 @@ package org.apache.tajo.jdbc; /**
*/
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatum;
import org.apache.tajo.exception.UnsupportedException;
@@ -47,7 +48,12 @@ public class MetaDataTuple implements Tuple {
@Override
public boolean isNull(int fieldid) {
- return values.get(fieldid) == null || values.get(fieldid) instanceof NullDatum;
+ return values.get(fieldid) == null || values.get(fieldid).isNull();
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return !isNull(fieldid);
}
@Override
@@ -142,7 +148,12 @@ public class MetaDataTuple implements Tuple {
@Override
public ProtobufDatum getProtobufDatum(int fieldId) {
- throw new UnsupportedException();
+ throw new UnsupportedException("getProtobufDatum");
+ }
+
+ @Override
+ public IntervalDatum getInterval(int fieldId) {
+ throw new UnsupportedException("getInterval");
}
@Override
@@ -157,6 +168,6 @@ public class MetaDataTuple implements Tuple {
@Override
public Datum[] getValues(){
- throw new UnsupportedException();
+ throw new UnsupportedException("getValues");
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index fe5366e..e65d844 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -736,6 +736,13 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${tajo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
<artifactId>tajo-yarn-pullserver</artifactId>
<version>${tajo.version}</version>
</dependency>