You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/04/18 03:45:56 UTC
[2/2] tajo git commit: TAJO-2109: Implement Radix sort.
TAJO-2109: Implement Radix sort.
Closes #992
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9afd9abe
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9afd9abe
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9afd9abe
Branch: refs/heads/master
Commit: 9afd9abe379cbef8c6ae2e17c19e280ed3ec2a07
Parents: 45100ce
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Apr 18 10:45:10 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Apr 18 10:45:10 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../main/java/org/apache/tajo/SessionVars.java | 3 +
.../apache/tajo/common/type/TajoTypeUtil.java | 19 +
.../java/org/apache/tajo/conf/TajoConf.java | 2 +
.../main/java/org/apache/tajo/datum/Datum.java | 14 +-
.../org/apache/tajo/datum/DatumFactory.java | 6 +-
.../tajo/tuple/memory/UnSafeTupleList.java | 4 +
.../org/apache/tajo/datum/TestBytesDatum.java | 4 +-
.../apache/tajo/datum/TestTimestampDatum.java | 20 +-
tajo-core-tests/pom.xml | 12 +
.../tajo/engine/eval/TestSQLExpression.java | 2 +-
.../engine/function/TestDateTimeFunctions.java | 2 +-
.../planner/physical/TestExternalSortExec.java | 119 ++-
.../engine/planner/physical/TestRadixSort.java | 260 ++++++
.../apache/tajo/engine/query/TestSortQuery.java | 28 +-
.../apache/tajo/engine/util/BenchmarkSort.java | 239 +++++
.../queries/TestSortQuery/testSort.sql | 2 +-
.../queries/TestSortQuery/testSortDesc.sql | 2 +-
.../TestSortQuery/testSortWithAlias1.sql | 2 +-
.../testSortWithAliasButOriginalName.sql | 2 +-
.../queries/TestSortQuery/testSortWithExpr1.sql | 2 +-
.../queries/TestSortQuery/testTopK.sql | 2 +-
.../queries/TestSortQuery/testTopkWithJson.json | 8 +
.../TestTajoCli/testHelpSessionVars.result | 1 +
.../engine/function/datetime/NowTimestamp.java | 2 +-
.../function/datetime/ToTimestampInt.java | 2 +-
.../engine/planner/UniformRangePartition.java | 6 +-
.../planner/physical/ExternalSortExec.java | 53 +-
.../tajo/engine/planner/physical/RadixSort.java | 921 +++++++++++++++++++
.../NonForwardQueryResultSystemScanner.java | 4 +-
.../java/org/apache/tajo/querymaster/Stage.java | 2 +-
.../org/apache/tajo/plan/logical/SortNode.java | 6 +-
tajo-project/pom.xml | 8 +-
.../org/apache/tajo/storage/StorageUtil.java | 38 -
.../org/apache/tajo/storage/TestStorages.java | 2 +-
.../apache/tajo/storage/jdbc/JdbcScanner.java | 2 +-
36 files changed, 1686 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7565faf..ac1a8aa 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.12.0 - unreleased
NEW FEATURES
+ TAJO-2109: Implement Radix sort. (jihoon)
+
TAJO-1955: Add a feature to strip quotes from CSV file. (hyunsik)
IMPROVEMENT
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index ba85549..ab00a41 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -165,6 +165,8 @@ public enum SessionVars implements ConfigKey {
COMPRESSED_RESULT_TRANSFER(ConfVars.$COMPRESSED_RESULT_TRANSFER, "Use compression to optimize result transmission.",
CLI_SIDE_VAR, Boolean.class, Validators.bool()),
+ SORT_ALGORITHM(ConfVars.$SORT_ALGORITHM, "sort algorithm", DEFAULT),
+
//-------------------------------------------------------------------------------
// Only for Unit Testing
//-------------------------------------------------------------------------------
@@ -174,6 +176,7 @@ public enum SessionVars implements ConfigKey {
TEST_FILTER_PUSHDOWN_ENABLED(ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED, "filter push down enabled", TEST_VAR),
TEST_MIN_TASK_NUM(ConfVars.$TEST_MIN_TASK_NUM, "(test only) min task num", TEST_VAR),
TEST_PLAN_SHAPE_FIX_ENABLED(ConfVars.$TEST_PLAN_SHAPE_FIX_ENABLED, "(test only) plan shape fix enabled", TEST_VAR),
+ TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT(ConfVars.$TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT, "(test only) Tim sort threshold for radix sort", TEST_VAR)
;
public static final Map<String, SessionVars> SESSION_VARS = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java b/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java
index a70218d..ecaeeb1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java
@@ -175,9 +175,28 @@ public class TajoTypeUtil {
case DATE:
case TIME:
case TIMESTAMP:
+ case INET4:
case VARCHAR:
+ case CHAR:
case TEXT: return false;
default: return true;
}
}
+
+ public static boolean isNumeric(Type type) {
+ return isNumber(type) || isReal(type);
+ }
+
+ public static boolean isNumber(Type type) {
+ return
+ type == Type.INT2 ||
+ type == Type.INT4 ||
+ type == Type.INT8;
+ }
+
+ public static boolean isReal(Type type) {
+ return
+ type == Type.FLOAT4||
+ type == Type.FLOAT8;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index c36f43b..24a5520 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -367,6 +367,7 @@ public class TajoConf extends Configuration {
$AGG_HASH_TABLE_SIZE("tajo.executor.aggregate.hash-table.size", 10000),
$SORT_LIST_SIZE("tajo.executor.sort.list.size", 100000),
$JOIN_HASH_TABLE_SIZE("tajo.executor.join.hash-table.size", 100000),
+ $SORT_ALGORITHM("tajo.executor.sort.algorithm", "TIM"),
// for index
$INDEX_ENABLED("tajo.query.index.enabled", false),
@@ -399,6 +400,7 @@ public class TajoConf extends Configuration {
$TEST_FILTER_PUSHDOWN_ENABLED("tajo.test.plan.filter-pushdown.enabled", true),
$TEST_MIN_TASK_NUM("tajo.test.min-task-num", -1),
$TEST_PLAN_SHAPE_FIX_ENABLED("tajo.test.plan.shape.fix.enabled", false), // used for explain statement test
+ $TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT("tajo.test.executor.radix-sort.tim-sort-threshold", 65536),
// Behavior Control ---------------------------------------------------------
$BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
index 6aa11ce..e2173a8 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
@@ -20,10 +20,11 @@ package org.apache.tajo.datum;
import com.google.gson.annotations.Expose;
import org.apache.tajo.SessionVars;
+import org.apache.tajo.common.type.TajoTypeUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.exception.InvalidValueForCastException;
import org.apache.tajo.exception.InvalidOperationException;
+import org.apache.tajo.exception.InvalidValueForCastException;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.json.CommonGsonHelper;
import org.apache.tajo.json.GsonObject;
@@ -120,20 +121,15 @@ public abstract class Datum implements Comparable<Datum>, GsonObject {
}
public boolean isNumeric() {
- return isNumber() || isReal();
+ return TajoTypeUtil.isNumeric(type);
}
public boolean isNumber() {
- return
- this.type == Type.INT2 ||
- this.type == Type.INT4 ||
- this.type == Type.INT8;
+ return TajoTypeUtil.isNumber(type);
}
public boolean isReal() {
- return
- this.type == Type.FLOAT4||
- this.type == Type.FLOAT8;
+ return TajoTypeUtil.isReal(type);
}
protected static void initAbortWhenDivideByZero(TajoConf tajoConf) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
index dd4a4e4..e9ac0c5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
@@ -289,12 +289,12 @@ public class DatumFactory {
return new TimeDatum(DateTimeUtil.toTime(tm));
}
- public static TimestampDatum createTimestmpDatumWithJavaMillis(long millis) {
+ public static TimestampDatum createTimestampDatumWithJavaMillis(long millis) {
return new TimestampDatum(DateTimeUtil.javaTimeToJulianTime(millis));
}
- public static TimestampDatum createTimestmpDatumWithUnixTime(int unixTime) {
- return createTimestmpDatumWithJavaMillis(unixTime * 1000L);
+ public static TimestampDatum createTimestampDatumWithUnixTime(int unixTime) {
+ return createTimestampDatumWithJavaMillis(unixTime * 1000L);
}
public static TimestampDatum createTimestamp(String datetimeStr) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
index 4c4a6cb..7bad396 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
@@ -54,6 +54,10 @@ public class UnSafeTupleList extends ArrayList<UnSafeTuple> {
}
+ public DataType[] getDataTypes() {
+ return dataTypes;
+ }
+
@Override
public boolean add(UnSafeTuple tuple) {
return addTuple(tuple);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java
index 4dcbbee..c3a0e84 100644
--- a/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java
+++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java
@@ -25,9 +25,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestBytesDatum {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java
index f82f66d..68b34a6 100644
--- a/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java
+++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java
@@ -47,33 +47,33 @@ public class TestTimestampDatum {
@Test
public final void testType() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
assertEquals(Type.TIMESTAMP, d.type());
}
@Test(expected = TajoRuntimeException.class)
public final void testAsInt4() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
d.asInt4();
}
@Test
public final void testAsInt8() {
- Datum d = DatumFactory.createTimestmpDatumWithJavaMillis(unixtime * 1000);
+ Datum d = DatumFactory.createTimestampDatumWithJavaMillis(unixtime * 1000);
long javaTime = unixtime * 1000;
assertEquals(DateTimeUtil.javaTimeToJulianTime(javaTime), d.asInt8());
}
@Test(expected = TajoRuntimeException.class)
public final void testAsFloat4() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
d.asFloat4();
}
@Test(expected = TajoRuntimeException.class)
public final void testAsFloat8() {
int instance = 1386577582;
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(instance);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(instance);
d.asFloat8();
}
@@ -97,7 +97,7 @@ public class TestTimestampDatum {
@Test
public final void testSize() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
assertEquals(TimestampDatum.SIZE, d.asByteArray().length);
}
@@ -112,7 +112,7 @@ public class TestTimestampDatum {
@Test
public final void testToJson() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
Datum copy = CommonGsonHelper.fromJson(d.toJson(), Datum.class);
assertEquals(d, copy);
}
@@ -168,12 +168,12 @@ public class TestTimestampDatum {
assertEquals(uTime, DateTimeUtil.julianTimeToEpoch(julianTimestamp));
assertEquals(jTime, DateTimeUtil.julianTimeToJavaTime(julianTimestamp));
- TimestampDatum datum3 = DatumFactory.createTimestmpDatumWithJavaMillis(jTime);
+ TimestampDatum datum3 = DatumFactory.createTimestampDatumWithJavaMillis(jTime);
assertEquals(cal.get(Calendar.YEAR), datum3.getYear());
assertEquals(cal.get(Calendar.MONTH) + 1, datum3.getMonthOfYear());
assertEquals(cal.get(Calendar.DAY_OF_MONTH), datum3.getDayOfMonth());
- datum3 = DatumFactory.createTimestmpDatumWithUnixTime(uTime);
+ datum3 = DatumFactory.createTimestampDatumWithUnixTime(uTime);
assertEquals(cal.get(Calendar.YEAR), datum3.getYear());
assertEquals(cal.get(Calendar.MONTH) + 1, datum3.getMonthOfYear());
assertEquals(cal.get(Calendar.DAY_OF_MONTH), datum3.getDayOfMonth());
@@ -182,7 +182,7 @@ public class TestTimestampDatum {
@Test
public final void testNull() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
assertEquals(Boolean.FALSE,d.equals(DatumFactory.createNullDatum()));
assertEquals(DatumFactory.createNullDatum(),d.equalsTo(DatumFactory.createNullDatum()));
assertEquals(-1,d.compareTo(DatumFactory.createNullDatum()));
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index 6de5546..b12642a 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -350,6 +350,18 @@
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <version>1.11.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>1.11.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java
index 2db826b..fa4561a 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java
@@ -860,7 +860,7 @@ public class TestSQLExpression extends ExprTestBase {
TimeZone tz = TimeZone.getTimeZone("GMT-6");
int unixtime = 1389071574; // (int) (System.currentTimeMillis() / 1000);
- TimestampDatum expected = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ TimestampDatum expected = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
testSimpleEval(context, String.format("select to_timestamp(CAST(split_part('%d.999', '.', 1) as INT8));", unixtime),
new String[] {TimestampDatum.asChars(expected.asTimeMeta(), tz, false)});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java
index dc9bd25..36a4a60 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java
@@ -41,7 +41,7 @@ public class TestDateTimeFunctions extends ExprTestBase {
@Test
public void testToTimestamp() throws TajoException {
long expectedTimestamp = System.currentTimeMillis();
- TimestampDatum expected = DatumFactory.createTimestmpDatumWithUnixTime((int)(expectedTimestamp/ 1000));
+ TimestampDatum expected = DatumFactory.createTimestampDatumWithUnixTime((int)(expectedTimestamp/ 1000));
// (expectedTimestamp / 1000) means the translation from millis seconds to unix timestamp
String q1 = String.format("select to_timestamp(%d);", (expectedTimestamp / 1000));
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 580fe86..788ebeb 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -30,12 +30,15 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.parser.sql.SQLAnalyzer;
+import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.physical.ExternalSortExec.SortAlgorithm;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.parser.sql.SQLAnalyzer;
+import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.logical.LogicalNode;
@@ -46,14 +49,20 @@ import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Random;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+@RunWith(Parameterized.class)
public class TestExternalSortExec {
private TajoConf conf;
private TajoTestingCluster util;
@@ -61,12 +70,27 @@ public class TestExternalSortExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
+ private LogicalOptimizer optimizer;
private Path testDir;
+ private Schema tableSchema;
private final int numTuple = 1000;
private Random rnd = new Random(System.currentTimeMillis());
private TableDesc employee;
+ private String sortAlgorithmString;
+
+ public TestExternalSortExec(String sortAlgorithm) {
+ this.sortAlgorithmString = sortAlgorithm;
+ }
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {SortAlgorithm.TIM.name()},
+ {SortAlgorithm.MSD_RADIX.name()},
+ });
+ }
@Before
public void setUp() throws Exception {
@@ -79,33 +103,81 @@ public class TestExternalSortExec {
catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
- Schema schema = SchemaFactory.newV1();
- schema.addColumn("managerid", Type.INT4);
- schema.addColumn("empid", Type.INT4);
- schema.addColumn("deptname", Type.TEXT);
+ tableSchema = SchemaFactory.newV1(new Column[] {
+ new Column("managerid", Type.INT8),
+ new Column("empid", Type.INT4),
+ new Column("deptname", Type.TEXT),
+ new Column("col1", Type.INT8),
+ new Column("col2", Type.INT8),
+ new Column("col3", Type.INT8),
+ new Column("col4", Type.INT8),
+ new Column("col5", Type.INT8),
+ new Column("col6", Type.INT8),
+ new Column("col7", Type.INT8),
+ new Column("col8", Type.INT8),
+ new Column("col9", Type.INT8),
+ new Column("col10", Type.INT8),
+ new Column("col11", Type.INT8),
+ new Column("col12", Type.INT8)
+ });
TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
Path employeePath = new Path(testDir, "employee.csv");
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
- .getAppender(employeeMeta, schema, employeePath);
+ .getAppender(employeeMeta, tableSchema, employeePath);
appender.enableStats();
appender.init();
- VTuple tuple = new VTuple(schema.size());
+ VTuple tuple = new VTuple(tableSchema.size());
for (int i = 0; i < numTuple; i++) {
- tuple.put(new Datum[] {
- DatumFactory.createInt4(rnd.nextInt(50)),
- DatumFactory.createInt4(rnd.nextInt(100)),
- DatumFactory.createText("dept_" + i),
- });
+ if (rnd.nextInt(1000) == 0) {
+ tuple.put(new Datum[] {
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ });
+ } else {
+ boolean positive = rnd.nextInt(2) == 0;
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(positive ? 100_000 + rnd.nextInt(100_000) : (100_000 + rnd.nextInt(100_000)) * -1),
+ DatumFactory.createInt4(rnd.nextInt(100)),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ });
+ }
appender.addTuple(tuple);
}
+
appender.flush();
appender.close();
- employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri());
+ employee = new TableDesc("default.employee", tableSchema, employeeMeta, employeePath.toUri());
catalog.createTable(employee);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+ optimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance());
}
@After
@@ -122,7 +194,8 @@ public class TestExternalSortExec {
public final void testNext() throws IOException, TajoException {
conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, 2);
QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
- queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 1);
+ queryContext.set(SessionVars.SORT_ALGORITHM.keyname(), sortAlgorithmString);
+ queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 4);
FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getUri()), Integer.MAX_VALUE);
@@ -132,28 +205,32 @@ public class TestExternalSortExec {
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
- LogicalNode rootNode = plan.getRootBlock().getRoot();
+ LogicalNode rootNode = optimizer.optimize(plan);
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
- ProjectionExec proj = (ProjectionExec) exec;
Tuple tuple;
Tuple preVal = null;
Tuple curVal;
int cnt = 0;
exec.init();
- long start = System.currentTimeMillis();
- BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(),
+ Schema sortSchema = SchemaFactory.newV1(new Column[] {
+ new Column("managerid", Type.INT8),
+ new Column("empid", Type.INT4),
+ });
+
+ BaseTupleComparator comparator = new BaseTupleComparator(sortSchema,
new SortSpec[]{
- new SortSpec(new Column("managerid", Type.INT4)),
- new SortSpec(new Column("empid", Type.INT4))
+ new SortSpec(new Column("managerid", Type.INT8)),
+ new SortSpec(new Column("empid", Type.INT4)),
});
+ long start = System.currentTimeMillis();
while ((tuple = exec.next()) != null) {
curVal = tuple;
if (preVal != null) {
- assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+ assertTrue("prev: " + preVal + ", but cur: " + curVal + ", cnt: " + cnt, comparator.compare(preVal, curVal) <= 0);
}
preVal = new VTuple(curVal);
cnt++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java
new file mode 100644
index 0000000..8246834
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.physical.ExternalSortExec.UnSafeComparator;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.tuple.memory.UnSafeTuple;
+import org.apache.tajo.tuple.memory.UnSafeTupleList;
+import org.apache.tajo.util.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestRadixSort {
+ private final static QueryContext queryContext;
+ private static UnSafeTupleList tuples;
+ private static Schema schema;
+ private static final int tupleNum = 1000;
+ private static final Random random = new Random(System.currentTimeMillis());
+ private SortSpec[] sortSpecs;
+ private final static Datum MINUS_ONE = DatumFactory.createInt4(-1);
+
+ static {
+ queryContext = new QueryContext(new TajoConf());
+ queryContext.setInt(SessionVars.TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT, 0);
+
+ schema = SchemaFactory.newV1(new Column[]{
+ new Column("col0", Type.INT8),
+ new Column("col1", Type.INT4),
+ new Column("col2", Type.INT2),
+ new Column("col3", Type.DATE),
+ new Column("col4", Type.TIMESTAMP),
+ new Column("col5", Type.TIME),
+ new Column("col6", Type.INET4),
+ new Column("col7", Type.FLOAT4),
+ new Column("col8", Type.FLOAT8)
+ });
+ }
+
+ private static class Param {
+ final SortSpec[] sortSpecs;
+
+ public Param(SortSpec[] param) {
+ this.sortSpecs = param;
+ }
+
+ @Override
+ public String toString() {
+ return StringUtils.join(sortSpecs);
+ }
+ }
+
+ public TestRadixSort(Param param) {
+ this.sortSpecs = param.sortSpecs;
+ }
+
+ @Parameters(name = "{index}: {0}")
+ public static Collection<Object[]> generateParameters() {
+ List<Object[]> params = new ArrayList<>();
+
+ // Test every single column sort
+ for (int i = 0; i < schema.size(); i++) {
+ params.add(new Object[] {
+ new Param(
+ new SortSpec[] {
+ new SortSpec(schema.getColumn(i), random.nextBoolean(), random.nextBoolean())
+ })
+ });
+ }
+
+ // Randomly choose columns
+ for (int colNum = 2; colNum < 6; colNum++) {
+ for (int i =0; i < 5; i++) {
+ SortSpec[] sortSpecs = new SortSpec[colNum];
+ for (int j = 0; j <colNum; j++) {
+ sortSpecs[j] = new SortSpec(schema.getColumn(random.nextInt(schema.size())),
+ random.nextBoolean(), random.nextBoolean());
+ }
+ params.add(new Object[] {new Param(sortSpecs)});
+ }
+ }
+
+ return params;
+ }
+
+ @Before
+ public void setup() {
+ List<DataType> dataTypeList = schema.getRootColumns().stream().map(c -> c.getDataType()).collect(Collectors.toList());
+ tuples = new UnSafeTupleList(dataTypeList.toArray(new DataType[dataTypeList.size()]), tupleNum);
+
+ // add null and negative numbers
+ VTuple tuple = new VTuple(schema.size());
+ IntStream.range(0, tupleNum - 6).forEach(i -> {
+ // Each of null tuples, max tuples, and min tuples occupies 10 % of the total tuples.
+ int r = random.nextInt(10);
+ switch (r) {
+ case 0:
+ makeNullTuple(tuple);
+ break;
+ case 1:
+ makeMaxTuple(tuple);
+ break;
+ case 2:
+ makeMinTuple(tuple);
+ break;
+ default:
+ makeRandomTuple(tuple);
+ break;
+ }
+
+ tuples.addTuple(tuple);
+ });
+
+ // Add at least 2 null, max, min tuples.
+ makeMaxTuple(tuple);
+ tuples.addTuple(tuple);
+ makeMinTuple(tuple);
+ tuples.addTuple(tuple);
+ makeNullTuple(tuple);
+ tuples.addTuple(tuple);
+ makeMaxTuple(tuple);
+ tuples.addTuple(tuple);
+ makeMinTuple(tuple);
+ tuples.addTuple(tuple);
+ makeNullTuple(tuple);
+ tuples.addTuple(tuple);
+ }
+
+ @After
+ public void teardown() {
+ tuples.release();
+ }
+
+ private static Tuple makeNullTuple(Tuple tuple) {
+ tuple.put(new Datum[] {
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get()
+ });
+ return tuple;
+ }
+
+ private static Tuple makeRandomTuple(Tuple tuple) {
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(random.nextLong()),
+ DatumFactory.createInt4(random.nextInt()),
+ DatumFactory.createInt2((short) random.nextInt(Short.MAX_VALUE)),
+ DatumFactory.createDate(Math.abs(random.nextInt())),
+ DatumFactory.createTimestamp(Math.abs(random.nextLong())),
+ DatumFactory.createTime(Math.abs(random.nextLong())),
+ DatumFactory.createInet4(random.nextInt()),
+ DatumFactory.createFloat4(random.nextFloat()),
+ DatumFactory.createFloat8(random.nextDouble())
+ });
+
+ for (int i = 0; i < 3; i++) {
+ if (random.nextBoolean()) {
+ tuple.put(i, tuple.asDatum(i).multiply(MINUS_ONE));
+ }
+ }
+
+ for (int i = 7; i < 9; i++) {
+ if (random.nextBoolean()) {
+ tuple.put(i, tuple.asDatum(i).multiply(MINUS_ONE));
+ }
+ }
+
+ return tuple;
+ }
+
+ private static Tuple makeMaxTuple(Tuple tuple) {
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(Long.MAX_VALUE),
+ DatumFactory.createInt4(Integer.MAX_VALUE),
+ DatumFactory.createInt2(Short.MAX_VALUE),
+ DatumFactory.createDate(Integer.MAX_VALUE),
+ DatumFactory.createTimestamp(Long.MAX_VALUE),
+ DatumFactory.createTime(Long.MAX_VALUE),
+ DatumFactory.createInet4(Integer.MAX_VALUE),
+ DatumFactory.createFloat4(Float.MAX_VALUE),
+ DatumFactory.createFloat8(Double.MAX_VALUE)
+ });
+
+ return tuple;
+ }
+
+ private static Tuple makeMinTuple(Tuple tuple) {
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(Long.MIN_VALUE),
+ DatumFactory.createInt4(Integer.MIN_VALUE),
+ DatumFactory.createInt2(Short.MIN_VALUE),
+ DatumFactory.createDate(0),
+ DatumFactory.createTimestamp(0),
+ DatumFactory.createTime(0),
+ DatumFactory.createInet4(Integer.MIN_VALUE),
+ DatumFactory.createFloat4(Float.MIN_VALUE),
+ DatumFactory.createFloat8(Double.MIN_VALUE)
+ });
+
+ return tuple;
+ }
+
+ @Test
+ public void testSort() {
+ Comparator<UnSafeTuple> comparator = new UnSafeComparator(schema, sortSpecs);
+
+ RadixSort.sort(queryContext, tuples, schema, sortSpecs, comparator);
+
+ IntStream.range(0, tuples.size() - 1)
+ .forEach(i -> {
+ assertTrue(tuples.get(i) + " precedes " + tuples.get(i + 1) + " at " + i,
+ comparator.compare(tuples.get(i), tuples.get(i + 1)) <= 0);
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 582d0b0..ef3336d 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -25,26 +25,47 @@ import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
+import org.junit.AfterClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
public class TestSortQuery extends QueryTestCaseBase {
- public TestSortQuery() {
+ public TestSortQuery(String sortAlgorithm) {
super(TajoConstants.DEFAULT_DATABASE_NAME);
Map<String, String> variables = new HashMap<>();
variables.put(SessionVars.SORT_LIST_SIZE.keyname(), "100");
+ variables.put(SessionVars.SORT_ALGORITHM.keyname(), sortAlgorithm);
client.updateSessionVariables(variables);
}
+ @AfterClass
+ public static void tearDown() throws Exception {
+ client.unsetSessionVariables(Arrays.asList(SessionVars.SORT_ALGORITHM.keyname()));
+ }
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {"TIM"},
+ {"MSD_RADIX"},
+ });
+ }
+
@Test
public final void testSort() throws Exception {
ResultSet res = executeQuery();
@@ -170,6 +191,8 @@ public class TestSortQuery extends QueryTestCaseBase {
ResultSet res = executeQuery();
assertResultSet(res);
cleanupQuery(res);
+
+ executeString("drop table testSortWithDate");
}
}
@@ -188,6 +211,8 @@ public class TestSortQuery extends QueryTestCaseBase {
ResultSet res = executeQuery();
assertResultSet(res);
cleanupQuery(res);
+
+ executeString("drop table table2");
}
@Test
@@ -446,6 +471,7 @@ public class TestSortQuery extends QueryTestCaseBase {
cleanupQuery(res);
} finally {
testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0");
+ executeString("drop table testOutOfScope");
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
new file mode 100644
index 0000000..1cc526f
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.util;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.planner.physical.TestExternalSortExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.parser.sql.SQLAnalyzer;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+
+@State(Scope.Benchmark)
+public class BenchmarkSort {
+ private TajoConf conf;
+ private TajoTestingCluster util;
+ private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/BenchmarkSort";
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private LogicalOptimizer optimizer;
+ private Path testDir;
+
+ private final int numTuple = 10000;
+ private Random rnd = new Random(System.currentTimeMillis());
+
+ private TableDesc employee;
+
+ String[] QUERIES = {
+ "select col0 from employee order by col0"
+ };
+
+ @State(Scope.Thread)
+ public static class BenchContext {
+ int sortBufferSize;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ this.conf = new TajoConf();
+ util = new TajoTestingCluster();
+ util.startCatalogCluster();
+ catalog = util.getCatalogService();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
+
+ Schema schema = SchemaFactory.newV1(new Column[] {
+ new Column("col0", Type.INT8),
+ new Column("col1", Type.INT4),
+ new Column("col2", Type.INT2),
+ new Column("col3", Type.DATE),
+ new Column("col4", Type.TIMESTAMP),
+ new Column("col5", Type.TIME),
+ new Column("col6", Type.INET4),
+ new Column("col7", Type.FLOAT4),
+ new Column("col8", Type.FLOAT8),
+ new Column("col9", Type.INT8),
+ new Column("col10", Type.INT8),
+ new Column("col11", Type.INT8),
+ new Column("col12", Type.INT8),
+ new Column("col13", Type.INT8),
+ new Column("col14", Type.INT8),
+ });
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(employeeMeta, schema, employeePath);
+ appender.enableStats();
+ appender.init();
+ VTuple tuple = new VTuple(schema.size());
+ for (int i = 0; i < numTuple; i++) {
+ if (rnd.nextInt(10000) == 0) {
+ tuple.put(new Datum[] {
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get()
+ });
+ } else {
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt4(rnd.nextInt()),
+ DatumFactory.createInt2((short) rnd.nextInt(Short.MAX_VALUE)),
+ DatumFactory.createDate(Math.abs(rnd.nextInt())),
+ DatumFactory.createTimestamp(Math.abs(rnd.nextLong())),
+ DatumFactory.createTime(Math.abs(rnd.nextLong())),
+ DatumFactory.createInet4(rnd.nextInt()),
+ DatumFactory.createFloat4(rnd.nextFloat()),
+ DatumFactory.createFloat8(rnd.nextDouble()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong())
+ });
+ }
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+
+ employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri());
+ catalog.createTable(employee);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+ optimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance());
+ }
+
+ @TearDown
+ public void tearDown() throws IOException {
+ CommonTestingUtil.cleanupTestDir(TEST_PATH);
+ util.shutdownCatalogCluster();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.All)
+ public void timSort(BenchContext context) throws InterruptedException, IOException, TajoException {
+ QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
+ queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 200);
+ queryContext.set(SessionVars.SORT_ALGORITHM.keyname(), "TIM");
+
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+ TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
+ LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ while (exec.next() != null) {}
+ exec.close();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.All)
+ public void msdRadixSort(BenchContext context) throws InterruptedException, IOException, TajoException {
+ QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
+ queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 200);
+ queryContext.set(SessionVars.SORT_ALGORITHM.keyname(), "MSD_RADIX");
+
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+ TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
+ LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ while (exec.next() != null) {}
+ exec.close();
+ }
+
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(BenchmarkSort.class.getSimpleName())
+ .warmupIterations(1)
+ .measurementIterations(1)
+ .forks(1)
+ .build();
+
+ new Runner(opt).run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql
index 7958002..ac79024 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey from lineitem order by l_orderkey;
\ No newline at end of file
+select l_linenumber, l_orderkey from lineitem order by l_orderkey, l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql
index 4252643..6636bed 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey from lineitem order by l_orderkey desc;
\ No newline at end of file
+select l_linenumber, l_orderkey from lineitem order by l_orderkey desc, l_linenumber asc;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql
index cd8be3e..fd88b7f 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey as sortkey from lineitem order by sortkey;
\ No newline at end of file
+select l_linenumber, l_orderkey as sortkey from lineitem order by sortkey, l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql
index 1d6396a..2be75a8 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey;
\ No newline at end of file
+select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey, l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql
index 2aeba26..ee3edda 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey + 1;
\ No newline at end of file
+select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey + 1, l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql
index 331f3b4..65519f0 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql
@@ -1 +1 @@
-select l_orderkey, l_linenumber from lineitem order by l_orderkey desc limit 3;
\ No newline at end of file
+select l_orderkey, l_linenumber from lineitem order by l_orderkey desc, l_linenumber asc limit 3;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json
index e3a264f..333037b 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json
@@ -32,6 +32,14 @@
},
"IsAsc": false,
"IsNullFirst": false
+ },
+ {
+ "SortKey": {
+ "ColumnName": "l_linenumber",
+ "OpType": "Column"
+ },
+ "IsAsc": true,
+ "IsNullFirst": false
}
],
"Expr": {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 46e6b76..5a6198e 100644
--- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -48,4 +48,5 @@ Available Session Variables:
\set FETCH_ROWNUM [int value] - The number of rows to be fetched from Master at a time
\set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution
\set COMPRESSED_RESULT_TRANSFER [true or false] - Use compression to optimize result transmission.
+\set SORT_ALGORITHM [text value] - sort algorithm
\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java
index dd0d195..2f298a7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java
@@ -44,7 +44,7 @@ public class NowTimestamp extends GeneralFunction {
@Override
public Datum eval(Tuple params) {
if (datum == null) {
- datum = DatumFactory.createTimestmpDatumWithJavaMillis(System.currentTimeMillis());
+ datum = DatumFactory.createTimestampDatumWithJavaMillis(System.currentTimeMillis());
}
return datum;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
index 5468b19..63b725c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
@@ -49,6 +49,6 @@ public class ToTimestampInt extends GeneralFunction {
if (params.isBlankOrNull(0)) {
return NullDatum.get();
}
- return DatumFactory.createTimestmpDatumWithUnixTime(params.getInt4(0));
+ return DatumFactory.createTimestampDatumWithUnixTime(params.getInt4(0));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index 02e397d..0b8199f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -657,13 +657,13 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
break;
case TIMESTAMP:
if (overflowFlag[i]) {
- end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(
+ end.put(i, DatumFactory.createTimestampDatumWithJavaMillis(
mergedRange.getStart().getInt8(i) + incs[i].longValue()));
} else {
if (sortSpecs[i].isAscending()) {
- end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.getInt8(i) + incs[i].longValue()));
+ end.put(i, DatumFactory.createTimestampDatumWithJavaMillis(last.getInt8(i) + incs[i].longValue()));
} else {
- end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.getInt8(i) - incs[i].longValue()));
+ end.put(i, DatumFactory.createTimestampDatumWithJavaMillis(last.getInt8(i) - incs[i].longValue()));
}
}
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index ff629c3..e269bf6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -36,11 +36,13 @@ import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.engine.planner.PhysicalPlanningException;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.logical.SortNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
@@ -53,10 +55,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -74,6 +73,12 @@ import java.util.concurrent.Future;
* </ul>
*/
public class ExternalSortExec extends SortExec {
+
+ enum SortAlgorithm{
+ TIM,
+ MSD_RADIX,
+ }
+
/** Class logger */
private static final Log LOG = LogFactory.getLog(ExternalSortExec.class);
/** The prefix of fragment name for intermediate */
@@ -117,6 +122,8 @@ public class ExternalSortExec extends SortExec {
/** total bytes of input data */
private long inputBytes;
+ private final SortAlgorithm sortAlgorithm;
+
private ExternalSortExec(final TaskAttemptContext context, final SortNode plan)
throws PhysicalPlanningException {
super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
@@ -133,6 +140,28 @@ public class ExternalSortExec extends SortExec {
this.localFS = new RawLocalFileSystem();
this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW);
this.inputStats = new TableStats();
+ this.sortAlgorithm = getSortAlgorithm(context.getQueryContext(), sortSpecs);
+ LOG.info(sortAlgorithm.name() + " sort is selected");
+ }
+
+ private static SortAlgorithm getSortAlgorithm(QueryContext context, SortSpec[] sortSpecs) {
+ String sortAlgorithm = context.get(SessionVars.SORT_ALGORITHM, SortAlgorithm.TIM.name());
+ if (Arrays.stream(sortSpecs)
+ .filter(sortSpec -> !RadixSort.isApplicableType(sortSpec)).count() > 0) {
+ if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.MSD_RADIX.name())) {
+ LOG.warn("Non-applicable types exist. Falling back to " + SortAlgorithm.TIM.name() + " sort");
+ }
+ return SortAlgorithm.TIM;
+ }
+ if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.TIM.name())) {
+ return SortAlgorithm.TIM;
+ } else if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.MSD_RADIX.name())) {
+ return SortAlgorithm.MSD_RADIX;
+ } else {
+ LOG.warn("Unknown sort type: " + sortAlgorithm);
+ LOG.warn("Falling back to " + SortAlgorithm.TIM.name() + " sort");
+ return SortAlgorithm.TIM;
+ }
}
public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, final ScanNode scanNode,
@@ -172,6 +201,18 @@ public class ExternalSortExec extends SortExec {
return this.plan;
}
+ private List<UnSafeTuple> sort(UnSafeTupleList tupleBlock) {
+ switch (sortAlgorithm) {
+ case TIM:
+ return OffHeapRowBlockUtils.sort(tupleBlock, unSafeComparator);
+ case MSD_RADIX:
+ return RadixSort.sort(context.getQueryContext(), tupleBlock, inSchema, sortSpecs, unSafeComparator);
+ default:
+ // The below line is not reachable. So, an exception should be thrown if it is executed.
+ throw new TajoRuntimeException(new UnsupportedException(sortAlgorithm.name()));
+ }
+ }
+
/**
* Sort a tuple block and store them into a chunk file
*/
@@ -180,7 +221,7 @@ public class ExternalSortExec extends SortExec {
int rowNum = tupleBlock.size();
long sortStart = System.currentTimeMillis();
- OffHeapRowBlockUtils.sort(tupleBlock, unSafeComparator);
+ this.sort(tupleBlock);
long sortEnd = System.currentTimeMillis();
long chunkWriteStart = System.currentTimeMillis();
@@ -527,7 +568,7 @@ public class ExternalSortExec extends SortExec {
if (chunk.isMemory()) {
long sortStart = System.currentTimeMillis();
- OffHeapRowBlockUtils.sort(inMemoryTable, unSafeComparator);
+ this.sort(inMemoryTable);
Scanner scanner = new MemTableScanner<>(inMemoryTable, inMemoryTable.size(), inMemoryTable.usedMem());
if(LOG.isDebugEnabled()) {
debug(LOG, "Memory Chunk sort (" + FileUtil.humanReadableByteCount(inMemoryTable.usedMem(), false)