You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/04/18 03:45:55 UTC
[1/2] tajo git commit: TAJO-2109: Implement Radix sort.
Repository: tajo
Updated Branches:
refs/heads/master 45100ced2 -> 9afd9abe3
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RadixSort.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RadixSort.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RadixSort.java
new file mode 100644
index 0000000..e804941
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RadixSort.java
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.common.type.TajoTypeUtil;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.tuple.memory.UnSafeTuple;
+import org.apache.tajo.tuple.memory.UnSafeTupleList;
+import org.apache.tajo.util.SizeOf;
+import sun.misc.Contended;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ *
+ * Radix sort implementation (https://en.wikipedia.org/wiki/Radix_sort).
+ * This implementation uses the hybrid approach which consists of MSD radix sort and Tim sort.
+ *
+ * It first organizes the given tuples into several bins which have the same radix key. For each bin, it groups
+ * it again using MSD radix sort if the length of the bin is sufficiently large. Otherwise, it simply sorts that bin
+ * using Tim sort.
+ *
+ */
+public class RadixSort {
+
+ private static final Log LOG = LogFactory.getLog(RadixSort.class);
+
+ private static class RadixSortContext {
+ @Contended UnSafeTuple[] in;
+ @Contended UnSafeTuple[] out;
+ @Contended final int[] keys;
+
+ final int[] sortKeyIds;
+ final int maxSortKeyId;
+ final Type[] sortKeyTypes;
+ final boolean[] asc;
+ final boolean[] nullFirst;
+ final Comparator<UnSafeTuple> comparator;
+
+ // If the number of tuples to be sorted does not exceed this value, Tim sort is used.
+ // The default value is 65536 which is got from some experiments.
+ final int timSortThreshold;
+
+ long msdRadixSortTime = 0;
+ long histogramPrepareTime = 0;
+ long swapTime = 0;
+ long histogramBuildTime = 0;
+ int msdRadixSortCall = 0;
+
+ public RadixSortContext(UnSafeTuple[] in, Schema schema, SortSpec[] sortSpecs, Comparator<UnSafeTuple> comparator,
+ int timSortThreshold) {
+ this.in = in;
+ this.out = new UnSafeTuple[in.length];
+ this.keys = new int[in.length];
+ this.maxSortKeyId = sortSpecs.length - 1;
+ this.sortKeyIds = new int[sortSpecs.length];
+ sortKeyTypes = new Type[sortSpecs.length];
+ asc = new boolean[sortSpecs.length];
+ nullFirst = new boolean[sortSpecs.length];
+ for (int i = 0; i < sortSpecs.length; i++) {
+ if (sortSpecs[i].getSortKey().hasQualifier()) {
+ this.sortKeyIds[i] = schema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName());
+ } else {
+ this.sortKeyIds[i] = schema.getColumnIdByName(sortSpecs[i].getSortKey().getSimpleName());
+ }
+ this.asc[i] = sortSpecs[i].isAscending();
+ this.nullFirst[i] = sortSpecs[i].isNullsFirst();
+ this.sortKeyTypes[i] = sortSpecs[i].getSortKey().getDataType().getType();
+ }
+ this.comparator = comparator;
+ this.timSortThreshold = timSortThreshold;
+ }
+
+ public void printStat() {
+ LOG.info("- msdRadixSortTime: " + msdRadixSortTime + " ms");
+ LOG.info("\t|- histogramPrepareTime: " + histogramPrepareTime + " ms");
+ LOG.info("\t\t|- histogramBuildTime: " + histogramBuildTime + " ms");
+ LOG.info("\t|- swapTime: " + swapTime + " ms");
+ LOG.info("- msdRadixSortCall: " + msdRadixSortCall + " times");
+ }
+ }
+
+ /**
+ * Entry method.
+ *
+ * @param list
+ * @param schema input schema
+ * @param sortSpecs sort specs
+ * @param comp comparator for Tim sort
+ * @return a sorted list of tuples
+ */
+ public static List<UnSafeTuple> sort(QueryContext queryContext, UnSafeTupleList list, Schema schema, SortSpec[] sortSpecs,
+ Comparator<UnSafeTuple> comp) {
+ UnSafeTuple[] in = list.toArray(new UnSafeTuple[list.size()]);
+ RadixSortContext context = new RadixSortContext(in, schema, sortSpecs, comp,
+ queryContext.getInt(SessionVars.TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT));
+
+ long before = System.currentTimeMillis();
+ recursiveCallForNextKey(context, 0, context.in.length, 0);
+ context.msdRadixSortTime += System.currentTimeMillis() - before;
+ context.printStat();
+ ListIterator<UnSafeTuple> it = list.listIterator();
+ for (UnSafeTuple t : context.in) {
+ it.next();
+ it.set(t);
+ }
+ return list;
+ }
+
+ static void recursiveCallForNextKey(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx) {
+ if (needConsiderSign(context.sortKeyTypes[curSortKeyIdx])) {
+ if (TajoTypeUtil.isReal(context.sortKeyTypes[curSortKeyIdx])) {
+ msdTernaryRadixSort(context, start, exclusiveEnd, curSortKeyIdx, context.asc[curSortKeyIdx],
+ calculateInitialPass(context.sortKeyTypes[curSortKeyIdx]));
+ } else {
+ msdRadixSort(context, start, exclusiveEnd, curSortKeyIdx, context.asc[curSortKeyIdx],
+ calculateInitialPass(context.sortKeyTypes[curSortKeyIdx]), true);
+ }
+ } else {
+ msdRadixSort(context, start, exclusiveEnd, curSortKeyIdx, context.asc[curSortKeyIdx],
+ calculateInitialPass(context.sortKeyTypes[curSortKeyIdx]), false);
+ }
+ }
+
+ static boolean needConsiderSign(Type type) {
+ switch (type) {
+ case INT2:
+ case INT4:
+ case INT8:
+ case TIME:
+ case TIMESTAMP:
+ case FLOAT4:
+ case FLOAT8:
+ return true;
+ case DATE:
+ case INET4:
+ return false;
+ default:
+ throw new TajoInternalError(new UnsupportedException(type.name()));
+ }
+ }
+
+ private static int getFieldOffset(long address, int fieldId) {
+ return PlatformDependent.getInt(address + (long)(SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)));
+ }
+
+ private static long getFieldAddr(long address, int fieldId) {
+ return address + getFieldOffset(address, fieldId);
+ }
+
+ /**
+ * Get a radix key from a column values of the given tuple.
+ * The sign of the column value should be considered.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int ascNullLastSignConsidered16RadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _16BIT_NULL_LAST_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ // For negative values, the key should be 1 ~ 32768. For positive values, the key should be 32769 ~ 65536.
+ key = PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) + _16BIT_SECOND_HALF_START_IDX;
+ }
+ return key;
+ }
+
+ /**
+ * Get a 16-bit radix key from a column values of the given tuple.
+ * The sign of the column value should be considered.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int ascNullFirstSignConsidered16RadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _16BIT_NULL_FIRST_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ // For negative values, the key should be 1 ~ 32768. For positive values, the key should be 32769 ~ 65536.
+ key = PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) + _16BIT_SECOND_HALF_START_IDX;
+ }
+ return key;
+ }
+
+ /**
+ * Get a 16-bit radix key from a column values of the given tuple.
+ * The sign of the column value should be considered.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int descNullLastSignConsidered16RadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _16BIT_NULL_LAST_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ // For positive values, the key should be 1 ~ 32768. For negative values, the key should be 32769 ~ 65536.
+ key = _16BIT_FIRST_HALF_END_IDX - PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass));
+ }
+ return key;
+ }
+
+ /**
+ * Get a 16-bit radix key from a column values of the given tuple.
+ * The sign of the column value should be considered.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int descNullFirstSignConsidered16RadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _16BIT_NULL_FIRST_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ // For positive values, the key should be 1 ~ 32768. For negative values, the key should be 32769 ~ 65536.
+ key = _16BIT_FIRST_HALF_END_IDX - PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass));
+ }
+ return key;
+ }
+
+ /**
+ * Get a 16-bit radix key from a column values of the given tuple.
+ * The return key is an unsigned short value.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int ascNullLast16RadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _16BIT_NULL_LAST_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ key = 1 + (PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) & SHORT_UNSIGNED_MASK);
+ }
+ return key;
+ }
+
+ /**
+ * Get a 16-bit radix key from a column values of the given tuple.
+ * The return key is an unsigned short value.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int ascNullFirst16RadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _16BIT_NULL_FIRST_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ key = 1 + (PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) & SHORT_UNSIGNED_MASK);
+ }
+ return key;
+ }
+
+ /**
+ * Get a 16-bit radix key from a column values of the given tuple.
+ * The return key is an unsigned short value.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int descNullLast16RadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _16BIT_NULL_LAST_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ key = _16BIT_MAX_BIN_IDX - (PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) & SHORT_UNSIGNED_MASK);
+ }
+ return key;
+ }
+
+ /**
+ * Get a 16-bit radix key from a column values of the given tuple.
+ * The return key is an unsigned short value.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int descNullFirst16RadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _16BIT_NULL_FIRST_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ key = _16BIT_MAX_BIN_IDX - (PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) & SHORT_UNSIGNED_MASK);
+ }
+ return key;
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare16AscNullLastSignConsideredHistogram(RadixSortContext context, int start, int exclusiveEnd,
+ int curSortKeyIdx, int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = ascNullLastSignConsidered16RadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare16AscNullFirstSignConsideredHistogram(RadixSortContext context, int start, int exclusiveEnd,
+ int curSortKeyIdx, int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = ascNullFirstSignConsidered16RadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare16DescNullLastSignConsideredHistogram(RadixSortContext context, int start, int exclusiveEnd,
+ int curSortKeyIdx, int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = descNullLastSignConsidered16RadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare16DescNullFirstSignConsideredHistogram(RadixSortContext context, int start, int exclusiveEnd,
+ int curSortKeyIdx, int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = descNullFirstSignConsidered16RadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare16AscNullLastHistogram(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx,
+ int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = ascNullLast16RadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare16AscNullFirstHistogram(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx,
+ int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = ascNullFirst16RadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare16DescNullLastHistogram(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx,
+ int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = descNullLast16RadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare16DescNullFirstHistogram(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx,
+ int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = descNullFirst16RadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ static void buildHistogram(RadixSortContext context, int start, int[] positions) {
+ long before = System.currentTimeMillis();
+ positions[0] += start;
+ for (int i = 0; i < positions.length - 1; i++) {
+ positions[i + 1] += positions[i];
+ }
+ context.histogramBuildTime += System.currentTimeMillis() - before;
+ }
+
+ private final static int _16BIT_BIN_NUM = 65538;
+ private final static int _16BIT_NULL_FIRST_IDX = 0;
+ private final static int _16BIT_NULL_LAST_IDX = 65537;
+ private final static int _16BIT_MAX_BIN_IDX = 65536;
+ private final static int _16BIT_FIRST_HALF_END_IDX = 32768;
+ private final static int _16BIT_SECOND_HALF_START_IDX = 32769;
+ private final static int SHORT_UNSIGNED_MASK = 0xFFFF;
+
+ /**
+ * Sort the specified part of the input tuples.
+ * If the length of the part is sufficiently large, recursively call msdRadixSort(). Otherwise, call Arrays.sort().
+ *
+ * @param context radix sort context
+ * @param start start position of the part will be sorted.
+ * @param exclusiveEnd end position of the part will be sorted.
+ * @param curSortKeyIdx current sort key index
+ * @param asc ascending flag
+ * @param pass current pass
+ * @param considerSign a flag to represent that the sign must be considered
+ */
+ static void msdRadixSort(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx, boolean asc,
+ int pass, boolean considerSign) {
+ context.msdRadixSortCall++;
+
+ // This array contains the end positions of bins. For example, suppose an input which consists of nulls and numbers
+ // of 1 ~ 9. The number of each numbers and null is 10. If this input is organized into 12 bins which have the equal
+ // length of 10, this array will contain the below values.
+ //
+ // Ex) asc, null first
+ //
+ // [0] [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] // 0th and 11th bins are reserved for null values
+ // 10 20 30 40 50 60 70 80 90 100 100 100
+ //
+ // If the considerSign flag is set, this array is used for both negative and positive values.
+ // The positions of both values are determined by the asc flag.
+ // When the asc flag is set, the first half of the array is used for negative values.
+ // Otherwise, the second half of the array is used for negative values.
+ //
+ // Note: too many recursive calls to msdRadixSort() can incur a lot of memory overhead because this array should be
+ // always newly created when it is called.
+ final int[] binEndIdx = new int[_16BIT_BIN_NUM];
+
+ // An array to cache radix keys which are gotten while building the histogram.
+ // Since getting keys is the most expensive part of this implementation, keys should be cached once they are gotten.
+ final int[] keys = context.keys;
+
+ // TODO: consider the current key type
+ long before = System.currentTimeMillis();
+ // Build a histogram.
+ // Call different methods depending on the sort spec of the current key. This is to avoid frequent branch
+ // mispredictions. This is effective because the below code block is the most expensive part of this implementation.
+ // TODO: code generation can simplify the below codes.
+ if (considerSign) {
+ if (asc) {
+ if (context.nullFirst[curSortKeyIdx]) {
+ prepare16AscNullFirstSignConsideredHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx,
+ keys);
+ } else {
+ prepare16AscNullLastSignConsideredHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx,
+ keys);
+ }
+ } else {
+ if (context.nullFirst[curSortKeyIdx]) {
+ prepare16DescNullFirstSignConsideredHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx,
+ keys);
+ } else {
+ prepare16DescNullLastSignConsideredHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx,
+ keys);
+ }
+ }
+ } else {
+ if (asc) {
+ if (context.nullFirst[curSortKeyIdx]) {
+ prepare16AscNullFirstHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx, keys);
+ } else {
+ prepare16AscNullLastHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx, keys);
+ }
+ } else {
+ if (context.nullFirst[curSortKeyIdx]) {
+ prepare16DescNullFirstHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx, keys);
+ } else {
+ prepare16DescNullLastHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx, keys);
+ }
+ }
+ }
+ context.histogramPrepareTime += System.currentTimeMillis() - before;
+
+ // Swap tuples if necessary.
+ // If every tuple has the same radix key, tuples don't have to be swapped.
+
+ boolean needSwap = Arrays.stream(binEndIdx).filter(eachCount -> eachCount > 0).count() > 1;
+ buildHistogram(context, start, binEndIdx);
+ if (needSwap) {
+ before = System.currentTimeMillis();
+ final int[] binNextElemIdx = new int[_16BIT_BIN_NUM];
+ System.arraycopy(binEndIdx, 0, binNextElemIdx, 0, _16BIT_BIN_NUM);
+ for (int i = start; i < exclusiveEnd; i++) {
+ context.out[--binNextElemIdx[keys[i]]] = context.in[i];
+ }
+ System.arraycopy(context.out, start, context.in, start, exclusiveEnd - start);
+ context.swapTime += System.currentTimeMillis() - before;
+ }
+
+ // Recursive call radix sort if necessary.
+ if (pass > 0 || curSortKeyIdx < context.maxSortKeyId) {
+ boolean nextKey = pass == 0;
+ int len = binEndIdx[0] - start;
+
+ if (len > 1) {
+ // Use the tim sort when the array length is sufficiently small.
+ if (len < context.timSortThreshold) {
+ Arrays.sort(context.in, start, binEndIdx[0], context.comparator);
+ } else {
+ if (nextKey) {
+ recursiveCallForNextKey(context, start, binEndIdx[0], curSortKeyIdx + 1);
+ } else {
+ msdRadixSort(context, start, binEndIdx[0], curSortKeyIdx, asc, pass - 2, false);
+ }
+ }
+ }
+
+ for (int i = 0; i < _16BIT_MAX_BIN_IDX && binEndIdx[i] < exclusiveEnd; i++) {
+ len = binEndIdx[i + 1] - binEndIdx[i];
+ if (len > 1) {
+ // Use the tim sort when the array length is sufficiently small.
+ if (len < context.timSortThreshold) {
+ Arrays.sort(context.in, binEndIdx[i], binEndIdx[i + 1], context.comparator);
+ } else {
+ if (nextKey) {
+ recursiveCallForNextKey(context, binEndIdx[i], binEndIdx[i + 1], curSortKeyIdx + 1);
+ } else {
+ msdRadixSort(context, binEndIdx[i], binEndIdx[i + 1], curSortKeyIdx, asc, pass - 2, false);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Below methods are only used for floating point types.
+
+ /**
+ * Get a 1-bit radix key from a column values of the given tuple.
+ * The keys of 0 and 3 are reserved for null values.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int ascNullFirst1bRadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = 0; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ key = 2 - ((PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) & 0xFFFF) >> 15);
+ }
+ return key;
+ }
+
+ /**
+ * Get a 1-bit radix key from a column values of the given tuple.
+ * The keys of 0 and 3 are reserved for null values.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int ascNullLast1bRadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _1BIT_BIN_MAX_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ key = 2 - ((PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) & 0xFFFF) >> 15);
+ }
+ return key;
+ }
+
+ /**
+ * Get a 1-bit radix key from a column values of the given tuple.
+ * The keys of 0 and 3 are reserved for null values.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int descNullFirst1bRadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = 0; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ key = 1 + ((PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) & 0xFFFF) >> 15);
+ }
+ return key;
+ }
+
+ /**
+ * Get a 1-bit radix key from a column values of the given tuple.
+ * The keys of 0 and 3 are reserved for null values.
+ *
+ * @param tuple
+ * @param sortKeyId
+ * @param pass
+ * @return
+ */
+ static int descNullLast1bRadixKey(UnSafeTuple tuple, int sortKeyId, int pass) {
+ int key = _1BIT_BIN_MAX_IDX; // for null
+ if (!tuple.isBlankOrNull(sortKeyId)) {
+ key = 1 + ((PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) & 0xFFFF) >> 15);
+ }
+ return key;
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare1bAscNullFirstHistogram(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx,
+ int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = ascNullFirst1bRadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare1bAscNullLastHistogram(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx,
+ int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = ascNullLast1bRadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare1bDescNullFirstHistogram(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx,
+ int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = descNullFirst1bRadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ /**
+ * Calculate positions of the input tuples.
+ *
+ * @param context
+ * @param start
+ * @param exclusiveEnd
+ * @param curSortKeyIdx
+ * @param pass
+ * @param positions
+ * @param keys
+ */
+ static void prepare1bDescNullLastHistogram(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx,
+ int pass, int[] positions, int[] keys) {
+ for (int i = start; i < exclusiveEnd; i++) {
+ keys[i] = descNullLast1bRadixKey(context.in[i], context.sortKeyIds[curSortKeyIdx], pass);
+ positions[keys[i]] += 1;
+ }
+ }
+
+ private final static int _1BIT_BIN_NUM = 4;
+ private final static int _1BIT_BIN_MAX_IDX = 3;
+
+ /**
+ * Sort the specified part of the input tuples when the current sort key has a floating point type.
+ * This method is called only once at the first pass.
+ *
+ * @param context radix sort context
+ * @param start start position of the part will be sorted
+ * @param exclusiveEnd end position of the part will be sorted
+ * @param curSortKeyIdx current sort key index
+ * @param asc ascending flag
+ * @param pass current pass
+ */
+ static void msdTernaryRadixSort(RadixSortContext context, int start, int exclusiveEnd, int curSortKeyIdx, boolean asc,
+ int pass) {
+ context.msdRadixSortCall++;
+
+ // The values of floating point types are organized into three groups, i.e., positives, negatives, and nulls.
+ // The positions for these groups are stored in an integer array of length 4.
+ // The first and last slots are reserved for null values.
+ // The second and third slots are used for positives and negatives depending on the ascending order specification.
+ // If the ascending order is specified, negatives come first. Otherwise, positives come first.
+ // Ex) asc, null first
+ //
+ // [ nulls ] [ negatives ] [ positives ] [ empty ]
+ //
+ final int[] binEndIdx = new int[_1BIT_BIN_NUM];
+
+ // An array to cache radix keys which are gotten while building the histogram.
+ // Since getting keys is the most expensive part of this implementation, keys should be cached once they are gotten.
+ final int[] keys = context.keys;
+
+ // TODO: consider the current key type
+ long before = System.currentTimeMillis();
+ // Build a histogram.
+ // Call different methods depending on the sort spec of the current key. This is to avoid frequent branch
+ // mispredictions. This is effective because the below code block is the most expensive part of this implementation.
+ // TODO: code generation can simplify the below codes.
+ if (asc) {
+ if (context.nullFirst[curSortKeyIdx]) {
+ prepare1bAscNullFirstHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx, keys);
+ } else {
+ prepare1bAscNullLastHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx, keys);
+ }
+ } else {
+ if (context.nullFirst[curSortKeyIdx]) {
+ prepare1bDescNullFirstHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx, keys);
+ } else {
+ prepare1bDescNullLastHistogram(context, start, exclusiveEnd, curSortKeyIdx, pass, binEndIdx, keys);
+ }
+ }
+
+ context.histogramPrepareTime += System.currentTimeMillis() - before;
+
+ // Swap tuples if necessary.
+ // If every tuple has the same radix key, tuples don't have to be swapped.
+ boolean needSwap = Arrays.stream(binEndIdx).filter(eachCount -> eachCount > 0).count() > 1;
+ buildHistogram(context, start, binEndIdx);
+ if (needSwap) {
+ before = System.currentTimeMillis();
+ final int[] binNextElemIdx = new int[_1BIT_BIN_NUM];
+ System.arraycopy(binEndIdx, 0, binNextElemIdx, 0, _1BIT_BIN_NUM);
+ for (int i = start; i < exclusiveEnd; i++) {
+ context.out[--binNextElemIdx[keys[i]]] = context.in[i];
+ }
+ System.arraycopy(context.out, start, context.in, start, exclusiveEnd - start);
+ context.swapTime += System.currentTimeMillis() - before;
+ }
+
+ // Recursively call radix sort
+ if (context.nullFirst[curSortKeyIdx]) {
+ // The bin with null values doesn't have to be sorted anymore. As a result, call sort for the next key directly.
+ if (curSortKeyIdx < context.maxSortKeyId) {
+ recursiveCallForNextKey(context, start, binEndIdx[0], curSortKeyIdx + 1);
+ }
+ } else {
+ // The bin with null values doesn't have to be sorted anymore. As a result, call sort for the next key directly.
+ if (curSortKeyIdx < context.maxSortKeyId) {
+ recursiveCallForNextKey(context, binEndIdx[2], binEndIdx[3], curSortKeyIdx + 1);
+ }
+ }
+
+ int len = binEndIdx[1] - binEndIdx[0];
+
+ if (len > 1) {
+ // Use the tim sort when the array length is sufficiently small.
+ if (len < context.timSortThreshold) {
+ Arrays.sort(context.in, binEndIdx[0], binEndIdx[1], context.comparator);
+ } else {
+ msdRadixSort(context, binEndIdx[0], binEndIdx[1], curSortKeyIdx, false, pass, false);
+ }
+ }
+
+ len = binEndIdx[2] - binEndIdx[1];
+
+ if (len > 1) {
+ // Use the tim sort when the array length is sufficiently small.
+ if (len < context.timSortThreshold) {
+ Arrays.sort(context.in, binEndIdx[1], binEndIdx[2], context.comparator);
+ } else {
+ msdRadixSort(context, binEndIdx[1], binEndIdx[2], curSortKeyIdx, true, pass, false);
+ }
+ }
+ }
+
+ static int calculateInitialPass(Type type) {
+ int initialPass = typeByteSize(type) - 2;
+ return initialPass < 0 ? 0 : initialPass;
+ }
+
+ static int typeByteSize(Type type) {
+ switch (type) {
+ case INT2:
+ return 2;
+ case INT4:
+ case FLOAT4:
+ return 4;
+ case INT8:
+ case FLOAT8:
+ return 8;
+ case INET4:
+ return 4;
+ case DATE:
+ return 4;
+ case TIME:
+ return 8;
+ case TIMESTAMP:
+ return 8;
+ default:
+ throw new TajoRuntimeException(new UnsupportedException(type.name()));
+ }
+ }
+
+ /**
+ * Returns whether this implementation supports the given type or not.
+ *
+ * @param sortSpec
+ * @return
+ */
+ public static boolean isApplicableType(SortSpec sortSpec) {
+ switch (sortSpec.getSortKey().getDataType().getType()) {
+ // Variable length types are not supported.
+ case CHAR:
+ case TEXT:
+ case BLOB:
+ return false;
+ // 1 byte types are not supported.
+ case BOOLEAN:
+ case BIT:
+ return false;
+ default:
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index d627064..4d93017 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -449,7 +449,7 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getNumRunningQueryMaster()));
} else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
if (aNodeStatus.getLastHeartbeatTime() > 0) {
- aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
+ aTuple.put(fieldId, DatumFactory.createTimestampDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
} else {
aTuple.put(fieldId, DatumFactory.createNullDatum());
}
@@ -503,7 +503,7 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getNumRunningTasks()));
} else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
if (aNodeStatus.getLastHeartbeatTime() > 0) {
- aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
+ aTuple.put(fieldId, DatumFactory.createTimestampDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
} else {
aTuple.put(fieldId, DatumFactory.createNullDatum());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 08ff184..fed75cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -875,8 +875,8 @@ public class Stage implements EventHandler<StageEvent> {
new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
} else {
if(stage.getSynchronizedState() == StageState.INITED) {
- stage.taskScheduler.start();
stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START));
+ stage.taskScheduler.start();
} else {
/* all tasks are killed before stage are inited */
if (stage.getTotalScheduledObjectsCount() == stage.getCompletedTaskCount()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java
index a31ec5a..e856ac0 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java
@@ -18,20 +18,20 @@
package org.apache.tajo.plan.logical;
-import java.util.Arrays;
-
import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
-
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.plan.PlanString;
import org.apache.tajo.util.TUtil;
+import java.util.Arrays;
+
public final class SortNode extends UnaryNode implements Cloneable {
public enum SortPurpose {
NORMAL,
STORAGE_SPECIFIED
}
+
@Expose private SortSpec [] sortKeys;
@Expose private SortPurpose sortPurpose;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 7d2c649..ee1317f 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -402,7 +402,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>3.5</version>
+ <version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
@@ -494,12 +494,12 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId>
- <version>2.19</version>
+ <version>2.19.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.19</version>
+ <version>2.19.1</version>
<configuration>
<trimStackTrace>false</trimStackTrace>
</configuration>
@@ -706,7 +706,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId>
- <version>2.15</version>
</plugin>
<plugin>
@@ -1338,7 +1337,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId>
- <version>2.19</version>
<configuration>
<aggregate>true</aggregate>
</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 20a5d5c..442519b 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -21,7 +21,6 @@ package org.apache.tajo.storage;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
import sun.nio.ch.DirectBuffer;
import java.io.DataInput;
@@ -32,43 +31,6 @@ import java.nio.ByteBuffer;
public class StorageUtil extends StorageConstants {
- public static int getColByteSize(Column col) {
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- return 1;
- case CHAR:
- return 1;
- case BIT:
- return 1;
- case INT2:
- return 2;
- case INT4:
- return 4;
- case INT8:
- return 8;
- case FLOAT4:
- return 4;
- case FLOAT8:
- return 8;
- case INET4:
- return 4;
- case INET6:
- return 32;
- case TEXT:
- return 256;
- case BLOB:
- return 256;
- case DATE:
- return 4;
- case TIME:
- return 8;
- case TIMESTAMP:
- return 8;
- default:
- return 0;
- }
- }
-
public static Path concatPath(String parent, String...childs) {
return concatPath(new Path(parent), childs);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 64316d1..2e88398 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -965,7 +965,7 @@ public class TestStorages {
VTuple tuple = new VTuple(index - 1);
index = 0;
- tuple.put(index++, DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000)));
+ tuple.put(index++, DatumFactory.createTimestampDatumWithUnixTime((int)(System.currentTimeMillis() / 1000)));
if (dateTypeSupport()) {
tuple.put(index++, DatumFactory.createDate("1980-04-01"));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
index 82c3be3..d7e1c3b 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
@@ -235,7 +235,7 @@ public abstract class JdbcScanner implements Scanner {
break;
case TIMESTAMP:
tuple.put(column_idx,
- DatumFactory.createTimestmpDatumWithJavaMillis(resultSet.getTimestamp(resultIdx).getTime()));
+ DatumFactory.createTimestampDatumWithJavaMillis(resultSet.getTimestamp(resultIdx).getTime()));
break;
case BINARY:
case VARBINARY:
[2/2] tajo git commit: TAJO-2109: Implement Radix sort.
Posted by ji...@apache.org.
TAJO-2109: Implement Radix sort.
Closes #992
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9afd9abe
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9afd9abe
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9afd9abe
Branch: refs/heads/master
Commit: 9afd9abe379cbef8c6ae2e17c19e280ed3ec2a07
Parents: 45100ce
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Apr 18 10:45:10 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Apr 18 10:45:10 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../main/java/org/apache/tajo/SessionVars.java | 3 +
.../apache/tajo/common/type/TajoTypeUtil.java | 19 +
.../java/org/apache/tajo/conf/TajoConf.java | 2 +
.../main/java/org/apache/tajo/datum/Datum.java | 14 +-
.../org/apache/tajo/datum/DatumFactory.java | 6 +-
.../tajo/tuple/memory/UnSafeTupleList.java | 4 +
.../org/apache/tajo/datum/TestBytesDatum.java | 4 +-
.../apache/tajo/datum/TestTimestampDatum.java | 20 +-
tajo-core-tests/pom.xml | 12 +
.../tajo/engine/eval/TestSQLExpression.java | 2 +-
.../engine/function/TestDateTimeFunctions.java | 2 +-
.../planner/physical/TestExternalSortExec.java | 119 ++-
.../engine/planner/physical/TestRadixSort.java | 260 ++++++
.../apache/tajo/engine/query/TestSortQuery.java | 28 +-
.../apache/tajo/engine/util/BenchmarkSort.java | 239 +++++
.../queries/TestSortQuery/testSort.sql | 2 +-
.../queries/TestSortQuery/testSortDesc.sql | 2 +-
.../TestSortQuery/testSortWithAlias1.sql | 2 +-
.../testSortWithAliasButOriginalName.sql | 2 +-
.../queries/TestSortQuery/testSortWithExpr1.sql | 2 +-
.../queries/TestSortQuery/testTopK.sql | 2 +-
.../queries/TestSortQuery/testTopkWithJson.json | 8 +
.../TestTajoCli/testHelpSessionVars.result | 1 +
.../engine/function/datetime/NowTimestamp.java | 2 +-
.../function/datetime/ToTimestampInt.java | 2 +-
.../engine/planner/UniformRangePartition.java | 6 +-
.../planner/physical/ExternalSortExec.java | 53 +-
.../tajo/engine/planner/physical/RadixSort.java | 921 +++++++++++++++++++
.../NonForwardQueryResultSystemScanner.java | 4 +-
.../java/org/apache/tajo/querymaster/Stage.java | 2 +-
.../org/apache/tajo/plan/logical/SortNode.java | 6 +-
tajo-project/pom.xml | 8 +-
.../org/apache/tajo/storage/StorageUtil.java | 38 -
.../org/apache/tajo/storage/TestStorages.java | 2 +-
.../apache/tajo/storage/jdbc/JdbcScanner.java | 2 +-
36 files changed, 1686 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7565faf..ac1a8aa 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.12.0 - unreleased
NEW FEATURES
+ TAJO-2109: Implement Radix sort. (jihoon)
+
TAJO-1955: Add a feature to strip quotes from CSV file. (hyunsik)
IMPROVEMENT
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index ba85549..ab00a41 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -165,6 +165,8 @@ public enum SessionVars implements ConfigKey {
COMPRESSED_RESULT_TRANSFER(ConfVars.$COMPRESSED_RESULT_TRANSFER, "Use compression to optimize result transmission.",
CLI_SIDE_VAR, Boolean.class, Validators.bool()),
+ SORT_ALGORITHM(ConfVars.$SORT_ALGORITHM, "sort algorithm", DEFAULT),
+
//-------------------------------------------------------------------------------
// Only for Unit Testing
//-------------------------------------------------------------------------------
@@ -174,6 +176,7 @@ public enum SessionVars implements ConfigKey {
TEST_FILTER_PUSHDOWN_ENABLED(ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED, "filter push down enabled", TEST_VAR),
TEST_MIN_TASK_NUM(ConfVars.$TEST_MIN_TASK_NUM, "(test only) min task num", TEST_VAR),
TEST_PLAN_SHAPE_FIX_ENABLED(ConfVars.$TEST_PLAN_SHAPE_FIX_ENABLED, "(test only) plan shape fix enabled", TEST_VAR),
+ TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT(ConfVars.$TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT, "(test only) Tim sort threshold for radix sort", TEST_VAR)
;
public static final Map<String, SessionVars> SESSION_VARS = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java b/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java
index a70218d..ecaeeb1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java
@@ -175,9 +175,28 @@ public class TajoTypeUtil {
case DATE:
case TIME:
case TIMESTAMP:
+ case INET4:
case VARCHAR:
+ case CHAR:
case TEXT: return false;
default: return true;
}
}
+
+ public static boolean isNumeric(Type type) {
+ return isNumber(type) || isReal(type);
+ }
+
+ public static boolean isNumber(Type type) {
+ return
+ type == Type.INT2 ||
+ type == Type.INT4 ||
+ type == Type.INT8;
+ }
+
+ public static boolean isReal(Type type) {
+ return
+ type == Type.FLOAT4||
+ type == Type.FLOAT8;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index c36f43b..24a5520 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -367,6 +367,7 @@ public class TajoConf extends Configuration {
$AGG_HASH_TABLE_SIZE("tajo.executor.aggregate.hash-table.size", 10000),
$SORT_LIST_SIZE("tajo.executor.sort.list.size", 100000),
$JOIN_HASH_TABLE_SIZE("tajo.executor.join.hash-table.size", 100000),
+ $SORT_ALGORITHM("tajo.executor.sort.algorithm", "TIM"),
// for index
$INDEX_ENABLED("tajo.query.index.enabled", false),
@@ -399,6 +400,7 @@ public class TajoConf extends Configuration {
$TEST_FILTER_PUSHDOWN_ENABLED("tajo.test.plan.filter-pushdown.enabled", true),
$TEST_MIN_TASK_NUM("tajo.test.min-task-num", -1),
$TEST_PLAN_SHAPE_FIX_ENABLED("tajo.test.plan.shape.fix.enabled", false), // used for explain statement test
+ $TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT("tajo.test.executor.radix-sort.tim-sort-threshold", 65536),
// Behavior Control ---------------------------------------------------------
$BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
index 6aa11ce..e2173a8 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
@@ -20,10 +20,11 @@ package org.apache.tajo.datum;
import com.google.gson.annotations.Expose;
import org.apache.tajo.SessionVars;
+import org.apache.tajo.common.type.TajoTypeUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.exception.InvalidValueForCastException;
import org.apache.tajo.exception.InvalidOperationException;
+import org.apache.tajo.exception.InvalidValueForCastException;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.json.CommonGsonHelper;
import org.apache.tajo.json.GsonObject;
@@ -120,20 +121,15 @@ public abstract class Datum implements Comparable<Datum>, GsonObject {
}
public boolean isNumeric() {
- return isNumber() || isReal();
+ return TajoTypeUtil.isNumeric(type);
}
public boolean isNumber() {
- return
- this.type == Type.INT2 ||
- this.type == Type.INT4 ||
- this.type == Type.INT8;
+ return TajoTypeUtil.isNumber(type);
}
public boolean isReal() {
- return
- this.type == Type.FLOAT4||
- this.type == Type.FLOAT8;
+ return TajoTypeUtil.isReal(type);
}
protected static void initAbortWhenDivideByZero(TajoConf tajoConf) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
index dd4a4e4..e9ac0c5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
@@ -289,12 +289,12 @@ public class DatumFactory {
return new TimeDatum(DateTimeUtil.toTime(tm));
}
- public static TimestampDatum createTimestmpDatumWithJavaMillis(long millis) {
+ public static TimestampDatum createTimestampDatumWithJavaMillis(long millis) {
return new TimestampDatum(DateTimeUtil.javaTimeToJulianTime(millis));
}
- public static TimestampDatum createTimestmpDatumWithUnixTime(int unixTime) {
- return createTimestmpDatumWithJavaMillis(unixTime * 1000L);
+ public static TimestampDatum createTimestampDatumWithUnixTime(int unixTime) {
+ return createTimestampDatumWithJavaMillis(unixTime * 1000L);
}
public static TimestampDatum createTimestamp(String datetimeStr) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
index 4c4a6cb..7bad396 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java
@@ -54,6 +54,10 @@ public class UnSafeTupleList extends ArrayList<UnSafeTuple> {
}
+ public DataType[] getDataTypes() {
+ return dataTypes;
+ }
+
@Override
public boolean add(UnSafeTuple tuple) {
return addTuple(tuple);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java
index 4dcbbee..c3a0e84 100644
--- a/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java
+++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java
@@ -25,9 +25,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestBytesDatum {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java
index f82f66d..68b34a6 100644
--- a/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java
+++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java
@@ -47,33 +47,33 @@ public class TestTimestampDatum {
@Test
public final void testType() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
assertEquals(Type.TIMESTAMP, d.type());
}
@Test(expected = TajoRuntimeException.class)
public final void testAsInt4() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
d.asInt4();
}
@Test
public final void testAsInt8() {
- Datum d = DatumFactory.createTimestmpDatumWithJavaMillis(unixtime * 1000);
+ Datum d = DatumFactory.createTimestampDatumWithJavaMillis(unixtime * 1000);
long javaTime = unixtime * 1000;
assertEquals(DateTimeUtil.javaTimeToJulianTime(javaTime), d.asInt8());
}
@Test(expected = TajoRuntimeException.class)
public final void testAsFloat4() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
d.asFloat4();
}
@Test(expected = TajoRuntimeException.class)
public final void testAsFloat8() {
int instance = 1386577582;
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(instance);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(instance);
d.asFloat8();
}
@@ -97,7 +97,7 @@ public class TestTimestampDatum {
@Test
public final void testSize() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
assertEquals(TimestampDatum.SIZE, d.asByteArray().length);
}
@@ -112,7 +112,7 @@ public class TestTimestampDatum {
@Test
public final void testToJson() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
Datum copy = CommonGsonHelper.fromJson(d.toJson(), Datum.class);
assertEquals(d, copy);
}
@@ -168,12 +168,12 @@ public class TestTimestampDatum {
assertEquals(uTime, DateTimeUtil.julianTimeToEpoch(julianTimestamp));
assertEquals(jTime, DateTimeUtil.julianTimeToJavaTime(julianTimestamp));
- TimestampDatum datum3 = DatumFactory.createTimestmpDatumWithJavaMillis(jTime);
+ TimestampDatum datum3 = DatumFactory.createTimestampDatumWithJavaMillis(jTime);
assertEquals(cal.get(Calendar.YEAR), datum3.getYear());
assertEquals(cal.get(Calendar.MONTH) + 1, datum3.getMonthOfYear());
assertEquals(cal.get(Calendar.DAY_OF_MONTH), datum3.getDayOfMonth());
- datum3 = DatumFactory.createTimestmpDatumWithUnixTime(uTime);
+ datum3 = DatumFactory.createTimestampDatumWithUnixTime(uTime);
assertEquals(cal.get(Calendar.YEAR), datum3.getYear());
assertEquals(cal.get(Calendar.MONTH) + 1, datum3.getMonthOfYear());
assertEquals(cal.get(Calendar.DAY_OF_MONTH), datum3.getDayOfMonth());
@@ -182,7 +182,7 @@ public class TestTimestampDatum {
@Test
public final void testNull() {
- Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
assertEquals(Boolean.FALSE,d.equals(DatumFactory.createNullDatum()));
assertEquals(DatumFactory.createNullDatum(),d.equalsTo(DatumFactory.createNullDatum()));
assertEquals(-1,d.compareTo(DatumFactory.createNullDatum()));
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index 6de5546..b12642a 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -350,6 +350,18 @@
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <version>1.11.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>1.11.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java
index 2db826b..fa4561a 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java
@@ -860,7 +860,7 @@ public class TestSQLExpression extends ExprTestBase {
TimeZone tz = TimeZone.getTimeZone("GMT-6");
int unixtime = 1389071574; // (int) (System.currentTimeMillis() / 1000);
- TimestampDatum expected = DatumFactory.createTimestmpDatumWithUnixTime(unixtime);
+ TimestampDatum expected = DatumFactory.createTimestampDatumWithUnixTime(unixtime);
testSimpleEval(context, String.format("select to_timestamp(CAST(split_part('%d.999', '.', 1) as INT8));", unixtime),
new String[] {TimestampDatum.asChars(expected.asTimeMeta(), tz, false)});
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java
index dc9bd25..36a4a60 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java
@@ -41,7 +41,7 @@ public class TestDateTimeFunctions extends ExprTestBase {
@Test
public void testToTimestamp() throws TajoException {
long expectedTimestamp = System.currentTimeMillis();
- TimestampDatum expected = DatumFactory.createTimestmpDatumWithUnixTime((int)(expectedTimestamp/ 1000));
+ TimestampDatum expected = DatumFactory.createTimestampDatumWithUnixTime((int)(expectedTimestamp/ 1000));
// (expectedTimestamp / 1000) means the translation from millis seconds to unix timestamp
String q1 = String.format("select to_timestamp(%d);", (expectedTimestamp / 1000));
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 580fe86..788ebeb 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -30,12 +30,15 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.parser.sql.SQLAnalyzer;
+import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.physical.ExternalSortExec.SortAlgorithm;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.parser.sql.SQLAnalyzer;
+import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.logical.LogicalNode;
@@ -46,14 +49,20 @@ import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Random;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+@RunWith(Parameterized.class)
public class TestExternalSortExec {
private TajoConf conf;
private TajoTestingCluster util;
@@ -61,12 +70,27 @@ public class TestExternalSortExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
+ private LogicalOptimizer optimizer;
private Path testDir;
+ private Schema tableSchema;
private final int numTuple = 1000;
private Random rnd = new Random(System.currentTimeMillis());
private TableDesc employee;
+ private String sortAlgorithmString;
+
+ public TestExternalSortExec(String sortAlgorithm) {
+ this.sortAlgorithmString = sortAlgorithm;
+ }
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {SortAlgorithm.TIM.name()},
+ {SortAlgorithm.MSD_RADIX.name()},
+ });
+ }
@Before
public void setUp() throws Exception {
@@ -79,33 +103,81 @@ public class TestExternalSortExec {
catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
- Schema schema = SchemaFactory.newV1();
- schema.addColumn("managerid", Type.INT4);
- schema.addColumn("empid", Type.INT4);
- schema.addColumn("deptname", Type.TEXT);
+ tableSchema = SchemaFactory.newV1(new Column[] {
+ new Column("managerid", Type.INT8),
+ new Column("empid", Type.INT4),
+ new Column("deptname", Type.TEXT),
+ new Column("col1", Type.INT8),
+ new Column("col2", Type.INT8),
+ new Column("col3", Type.INT8),
+ new Column("col4", Type.INT8),
+ new Column("col5", Type.INT8),
+ new Column("col6", Type.INT8),
+ new Column("col7", Type.INT8),
+ new Column("col8", Type.INT8),
+ new Column("col9", Type.INT8),
+ new Column("col10", Type.INT8),
+ new Column("col11", Type.INT8),
+ new Column("col12", Type.INT8)
+ });
TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
Path employeePath = new Path(testDir, "employee.csv");
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
- .getAppender(employeeMeta, schema, employeePath);
+ .getAppender(employeeMeta, tableSchema, employeePath);
appender.enableStats();
appender.init();
- VTuple tuple = new VTuple(schema.size());
+ VTuple tuple = new VTuple(tableSchema.size());
for (int i = 0; i < numTuple; i++) {
- tuple.put(new Datum[] {
- DatumFactory.createInt4(rnd.nextInt(50)),
- DatumFactory.createInt4(rnd.nextInt(100)),
- DatumFactory.createText("dept_" + i),
- });
+ if (rnd.nextInt(1000) == 0) {
+ tuple.put(new Datum[] {
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ });
+ } else {
+ boolean positive = rnd.nextInt(2) == 0;
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(positive ? 100_000 + rnd.nextInt(100_000) : (100_000 + rnd.nextInt(100_000)) * -1),
+ DatumFactory.createInt4(rnd.nextInt(100)),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ DatumFactory.createInt8(100_000 + rnd.nextInt(50)),
+ });
+ }
appender.addTuple(tuple);
}
+
appender.flush();
appender.close();
- employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri());
+ employee = new TableDesc("default.employee", tableSchema, employeeMeta, employeePath.toUri());
catalog.createTable(employee);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+ optimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance());
}
@After
@@ -122,7 +194,8 @@ public class TestExternalSortExec {
public final void testNext() throws IOException, TajoException {
conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, 2);
QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
- queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 1);
+ queryContext.set(SessionVars.SORT_ALGORITHM.keyname(), sortAlgorithmString);
+ queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 4);
FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getUri()), Integer.MAX_VALUE);
@@ -132,28 +205,32 @@ public class TestExternalSortExec {
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
- LogicalNode rootNode = plan.getRootBlock().getRoot();
+ LogicalNode rootNode = optimizer.optimize(plan);
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
- ProjectionExec proj = (ProjectionExec) exec;
Tuple tuple;
Tuple preVal = null;
Tuple curVal;
int cnt = 0;
exec.init();
- long start = System.currentTimeMillis();
- BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(),
+ Schema sortSchema = SchemaFactory.newV1(new Column[] {
+ new Column("managerid", Type.INT8),
+ new Column("empid", Type.INT4),
+ });
+
+ BaseTupleComparator comparator = new BaseTupleComparator(sortSchema,
new SortSpec[]{
- new SortSpec(new Column("managerid", Type.INT4)),
- new SortSpec(new Column("empid", Type.INT4))
+ new SortSpec(new Column("managerid", Type.INT8)),
+ new SortSpec(new Column("empid", Type.INT4)),
});
+ long start = System.currentTimeMillis();
while ((tuple = exec.next()) != null) {
curVal = tuple;
if (preVal != null) {
- assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+ assertTrue("prev: " + preVal + ", but cur: " + curVal + ", cnt: " + cnt, comparator.compare(preVal, curVal) <= 0);
}
preVal = new VTuple(curVal);
cnt++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java
new file mode 100644
index 0000000..8246834
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaFactory;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.physical.ExternalSortExec.UnSafeComparator;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.tuple.memory.UnSafeTuple;
+import org.apache.tajo.tuple.memory.UnSafeTupleList;
+import org.apache.tajo.util.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestRadixSort {
+ private final static QueryContext queryContext;
+ private static UnSafeTupleList tuples;
+ private static Schema schema;
+ private static final int tupleNum = 1000;
+ private static final Random random = new Random(System.currentTimeMillis());
+ private SortSpec[] sortSpecs;
+ private final static Datum MINUS_ONE = DatumFactory.createInt4(-1);
+
+ static {
+ queryContext = new QueryContext(new TajoConf());
+ queryContext.setInt(SessionVars.TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT, 0);
+
+ schema = SchemaFactory.newV1(new Column[]{
+ new Column("col0", Type.INT8),
+ new Column("col1", Type.INT4),
+ new Column("col2", Type.INT2),
+ new Column("col3", Type.DATE),
+ new Column("col4", Type.TIMESTAMP),
+ new Column("col5", Type.TIME),
+ new Column("col6", Type.INET4),
+ new Column("col7", Type.FLOAT4),
+ new Column("col8", Type.FLOAT8)
+ });
+ }
+
+ private static class Param {
+ final SortSpec[] sortSpecs;
+
+ public Param(SortSpec[] param) {
+ this.sortSpecs = param;
+ }
+
+ @Override
+ public String toString() {
+ return StringUtils.join(sortSpecs);
+ }
+ }
+
+ public TestRadixSort(Param param) {
+ this.sortSpecs = param.sortSpecs;
+ }
+
+ @Parameters(name = "{index}: {0}")
+ public static Collection<Object[]> generateParameters() {
+ List<Object[]> params = new ArrayList<>();
+
+ // Test every single column sort
+ for (int i = 0; i < schema.size(); i++) {
+ params.add(new Object[] {
+ new Param(
+ new SortSpec[] {
+ new SortSpec(schema.getColumn(i), random.nextBoolean(), random.nextBoolean())
+ })
+ });
+ }
+
+ // Randomly choose columns
+ for (int colNum = 2; colNum < 6; colNum++) {
+ for (int i =0; i < 5; i++) {
+ SortSpec[] sortSpecs = new SortSpec[colNum];
+ for (int j = 0; j <colNum; j++) {
+ sortSpecs[j] = new SortSpec(schema.getColumn(random.nextInt(schema.size())),
+ random.nextBoolean(), random.nextBoolean());
+ }
+ params.add(new Object[] {new Param(sortSpecs)});
+ }
+ }
+
+ return params;
+ }
+
+ @Before
+ public void setup() {
+ List<DataType> dataTypeList = schema.getRootColumns().stream().map(c -> c.getDataType()).collect(Collectors.toList());
+ tuples = new UnSafeTupleList(dataTypeList.toArray(new DataType[dataTypeList.size()]), tupleNum);
+
+ // add null and negative numbers
+ VTuple tuple = new VTuple(schema.size());
+ IntStream.range(0, tupleNum - 6).forEach(i -> {
+ // Each of null tuples, max tuples, and min tuples occupies 10 % of the total tuples.
+ int r = random.nextInt(10);
+ switch (r) {
+ case 0:
+ makeNullTuple(tuple);
+ break;
+ case 1:
+ makeMaxTuple(tuple);
+ break;
+ case 2:
+ makeMinTuple(tuple);
+ break;
+ default:
+ makeRandomTuple(tuple);
+ break;
+ }
+
+ tuples.addTuple(tuple);
+ });
+
+ // Add at least 2 null, max, min tuples.
+ makeMaxTuple(tuple);
+ tuples.addTuple(tuple);
+ makeMinTuple(tuple);
+ tuples.addTuple(tuple);
+ makeNullTuple(tuple);
+ tuples.addTuple(tuple);
+ makeMaxTuple(tuple);
+ tuples.addTuple(tuple);
+ makeMinTuple(tuple);
+ tuples.addTuple(tuple);
+ makeNullTuple(tuple);
+ tuples.addTuple(tuple);
+ }
+
+ @After
+ public void teardown() {
+ tuples.release();
+ }
+
+ private static Tuple makeNullTuple(Tuple tuple) {
+ tuple.put(new Datum[] {
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get()
+ });
+ return tuple;
+ }
+
+ private static Tuple makeRandomTuple(Tuple tuple) {
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(random.nextLong()),
+ DatumFactory.createInt4(random.nextInt()),
+ DatumFactory.createInt2((short) random.nextInt(Short.MAX_VALUE)),
+ DatumFactory.createDate(Math.abs(random.nextInt())),
+ DatumFactory.createTimestamp(Math.abs(random.nextLong())),
+ DatumFactory.createTime(Math.abs(random.nextLong())),
+ DatumFactory.createInet4(random.nextInt()),
+ DatumFactory.createFloat4(random.nextFloat()),
+ DatumFactory.createFloat8(random.nextDouble())
+ });
+
+ for (int i = 0; i < 3; i++) {
+ if (random.nextBoolean()) {
+ tuple.put(i, tuple.asDatum(i).multiply(MINUS_ONE));
+ }
+ }
+
+ for (int i = 7; i < 9; i++) {
+ if (random.nextBoolean()) {
+ tuple.put(i, tuple.asDatum(i).multiply(MINUS_ONE));
+ }
+ }
+
+ return tuple;
+ }
+
+ private static Tuple makeMaxTuple(Tuple tuple) {
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(Long.MAX_VALUE),
+ DatumFactory.createInt4(Integer.MAX_VALUE),
+ DatumFactory.createInt2(Short.MAX_VALUE),
+ DatumFactory.createDate(Integer.MAX_VALUE),
+ DatumFactory.createTimestamp(Long.MAX_VALUE),
+ DatumFactory.createTime(Long.MAX_VALUE),
+ DatumFactory.createInet4(Integer.MAX_VALUE),
+ DatumFactory.createFloat4(Float.MAX_VALUE),
+ DatumFactory.createFloat8(Double.MAX_VALUE)
+ });
+
+ return tuple;
+ }
+
+ private static Tuple makeMinTuple(Tuple tuple) {
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(Long.MIN_VALUE),
+ DatumFactory.createInt4(Integer.MIN_VALUE),
+ DatumFactory.createInt2(Short.MIN_VALUE),
+ DatumFactory.createDate(0),
+ DatumFactory.createTimestamp(0),
+ DatumFactory.createTime(0),
+ DatumFactory.createInet4(Integer.MIN_VALUE),
+ DatumFactory.createFloat4(Float.MIN_VALUE),
+ DatumFactory.createFloat8(Double.MIN_VALUE)
+ });
+
+ return tuple;
+ }
+
+ @Test
+ public void testSort() {
+ Comparator<UnSafeTuple> comparator = new UnSafeComparator(schema, sortSpecs);
+
+ RadixSort.sort(queryContext, tuples, schema, sortSpecs, comparator);
+
+ IntStream.range(0, tuples.size() - 1)
+ .forEach(i -> {
+ assertTrue(tuples.get(i) + " precedes " + tuples.get(i + 1) + " at " + i,
+ comparator.compare(tuples.get(i), tuples.get(i + 1)) <= 0);
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 582d0b0..ef3336d 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -25,26 +25,47 @@ import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
+import org.junit.AfterClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
public class TestSortQuery extends QueryTestCaseBase {
- public TestSortQuery() {
+ public TestSortQuery(String sortAlgorithm) {
super(TajoConstants.DEFAULT_DATABASE_NAME);
Map<String, String> variables = new HashMap<>();
variables.put(SessionVars.SORT_LIST_SIZE.keyname(), "100");
+ variables.put(SessionVars.SORT_ALGORITHM.keyname(), sortAlgorithm);
client.updateSessionVariables(variables);
}
+ @AfterClass
+ public static void tearDown() throws Exception {
+ client.unsetSessionVariables(Arrays.asList(SessionVars.SORT_ALGORITHM.keyname()));
+ }
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {"TIM"},
+ {"MSD_RADIX"},
+ });
+ }
+
@Test
public final void testSort() throws Exception {
ResultSet res = executeQuery();
@@ -170,6 +191,8 @@ public class TestSortQuery extends QueryTestCaseBase {
ResultSet res = executeQuery();
assertResultSet(res);
cleanupQuery(res);
+
+ executeString("drop table testSortWithDate");
}
}
@@ -188,6 +211,8 @@ public class TestSortQuery extends QueryTestCaseBase {
ResultSet res = executeQuery();
assertResultSet(res);
cleanupQuery(res);
+
+ executeString("drop table table2");
}
@Test
@@ -446,6 +471,7 @@ public class TestSortQuery extends QueryTestCaseBase {
cleanupQuery(res);
} finally {
testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0");
+ executeString("drop table testOutOfScope");
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
new file mode 100644
index 0000000..1cc526f
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.util;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.planner.physical.TestExternalSortExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.parser.sql.SQLAnalyzer;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+
+@State(Scope.Benchmark)
+public class BenchmarkSort {
+ private TajoConf conf;
+ private TajoTestingCluster util;
+ private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/BenchmarkSort";
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private LogicalOptimizer optimizer;
+ private Path testDir;
+
+ private final int numTuple = 10000;
+ private Random rnd = new Random(System.currentTimeMillis());
+
+ private TableDesc employee;
+
+ String[] QUERIES = {
+ "select col0 from employee order by col0"
+ };
+
+ @State(Scope.Thread)
+ public static class BenchContext {
+ int sortBufferSize;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ this.conf = new TajoConf();
+ util = new TajoTestingCluster();
+ util.startCatalogCluster();
+ catalog = util.getCatalogService();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
+
+ Schema schema = SchemaFactory.newV1(new Column[] {
+ new Column("col0", Type.INT8),
+ new Column("col1", Type.INT4),
+ new Column("col2", Type.INT2),
+ new Column("col3", Type.DATE),
+ new Column("col4", Type.TIMESTAMP),
+ new Column("col5", Type.TIME),
+ new Column("col6", Type.INET4),
+ new Column("col7", Type.FLOAT4),
+ new Column("col8", Type.FLOAT8),
+ new Column("col9", Type.INT8),
+ new Column("col10", Type.INT8),
+ new Column("col11", Type.INT8),
+ new Column("col12", Type.INT8),
+ new Column("col13", Type.INT8),
+ new Column("col14", Type.INT8),
+ });
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+ .getAppender(employeeMeta, schema, employeePath);
+ appender.enableStats();
+ appender.init();
+ VTuple tuple = new VTuple(schema.size());
+ for (int i = 0; i < numTuple; i++) {
+ if (rnd.nextInt(10000) == 0) {
+ tuple.put(new Datum[] {
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get(),
+ NullDatum.get()
+ });
+ } else {
+ tuple.put(new Datum[]{
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt4(rnd.nextInt()),
+ DatumFactory.createInt2((short) rnd.nextInt(Short.MAX_VALUE)),
+ DatumFactory.createDate(Math.abs(rnd.nextInt())),
+ DatumFactory.createTimestamp(Math.abs(rnd.nextLong())),
+ DatumFactory.createTime(Math.abs(rnd.nextLong())),
+ DatumFactory.createInet4(rnd.nextInt()),
+ DatumFactory.createFloat4(rnd.nextFloat()),
+ DatumFactory.createFloat8(rnd.nextDouble()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong()),
+ DatumFactory.createInt8(rnd.nextLong())
+ });
+ }
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+
+ employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri());
+ catalog.createTable(employee);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+ optimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance());
+ }
+
+ @TearDown
+ public void tearDown() throws IOException {
+ CommonTestingUtil.cleanupTestDir(TEST_PATH);
+ util.shutdownCatalogCluster();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.All)
+ public void timSort(BenchContext context) throws InterruptedException, IOException, TajoException {
+ QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
+ queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 200);
+ queryContext.set(SessionVars.SORT_ALGORITHM.keyname(), "TIM");
+
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+ TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
+ LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ while (exec.next() != null) {}
+ exec.close();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.All)
+ public void msdRadixSort(BenchContext context) throws InterruptedException, IOException, TajoException {
+ QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
+ queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 200);
+ queryContext.set(SessionVars.SORT_ALGORITHM.keyname(), "MSD_RADIX");
+
+ FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
+ new Path(employee.getUri()), Integer.MAX_VALUE);
+ Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+ TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
+ LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+ LogicalNode rootNode = optimizer.optimize(plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ exec.init();
+ while (exec.next() != null) {}
+ exec.close();
+ }
+
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(BenchmarkSort.class.getSimpleName())
+ .warmupIterations(1)
+ .measurementIterations(1)
+ .forks(1)
+ .build();
+
+ new Runner(opt).run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql
index 7958002..ac79024 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey from lineitem order by l_orderkey;
\ No newline at end of file
+select l_linenumber, l_orderkey from lineitem order by l_orderkey, l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql
index 4252643..6636bed 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey from lineitem order by l_orderkey desc;
\ No newline at end of file
+select l_linenumber, l_orderkey from lineitem order by l_orderkey desc, l_linenumber asc;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql
index cd8be3e..fd88b7f 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey as sortkey from lineitem order by sortkey;
\ No newline at end of file
+select l_linenumber, l_orderkey as sortkey from lineitem order by sortkey, l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql
index 1d6396a..2be75a8 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey;
\ No newline at end of file
+select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey, l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql
index 2aeba26..ee3edda 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql
@@ -1 +1 @@
-select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey + 1;
\ No newline at end of file
+select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey + 1, l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql
index 331f3b4..65519f0 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql
@@ -1 +1 @@
-select l_orderkey, l_linenumber from lineitem order by l_orderkey desc limit 3;
\ No newline at end of file
+select l_orderkey, l_linenumber from lineitem order by l_orderkey desc, l_linenumber asc limit 3;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json
index e3a264f..333037b 100644
--- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json
@@ -32,6 +32,14 @@
},
"IsAsc": false,
"IsNullFirst": false
+ },
+ {
+ "SortKey": {
+ "ColumnName": "l_linenumber",
+ "OpType": "Column"
+ },
+ "IsAsc": true,
+ "IsNullFirst": false
}
],
"Expr": {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 46e6b76..5a6198e 100644
--- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -48,4 +48,5 @@ Available Session Variables:
\set FETCH_ROWNUM [int value] - The number of rows to be fetched from Master at a time
\set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution
\set COMPRESSED_RESULT_TRANSFER [true or false] - Use compression to optimize result transmission.
+\set SORT_ALGORITHM [text value] - sort algorithm
\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java
index dd0d195..2f298a7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java
@@ -44,7 +44,7 @@ public class NowTimestamp extends GeneralFunction {
@Override
public Datum eval(Tuple params) {
if (datum == null) {
- datum = DatumFactory.createTimestmpDatumWithJavaMillis(System.currentTimeMillis());
+ datum = DatumFactory.createTimestampDatumWithJavaMillis(System.currentTimeMillis());
}
return datum;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
index 5468b19..63b725c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
@@ -49,6 +49,6 @@ public class ToTimestampInt extends GeneralFunction {
if (params.isBlankOrNull(0)) {
return NullDatum.get();
}
- return DatumFactory.createTimestmpDatumWithUnixTime(params.getInt4(0));
+ return DatumFactory.createTimestampDatumWithUnixTime(params.getInt4(0));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index 02e397d..0b8199f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -657,13 +657,13 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
break;
case TIMESTAMP:
if (overflowFlag[i]) {
- end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(
+ end.put(i, DatumFactory.createTimestampDatumWithJavaMillis(
mergedRange.getStart().getInt8(i) + incs[i].longValue()));
} else {
if (sortSpecs[i].isAscending()) {
- end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.getInt8(i) + incs[i].longValue()));
+ end.put(i, DatumFactory.createTimestampDatumWithJavaMillis(last.getInt8(i) + incs[i].longValue()));
} else {
- end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.getInt8(i) - incs[i].longValue()));
+ end.put(i, DatumFactory.createTimestampDatumWithJavaMillis(last.getInt8(i) - incs[i].longValue()));
}
}
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index ff629c3..e269bf6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -36,11 +36,13 @@ import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.engine.planner.PhysicalPlanningException;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.logical.SortNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
@@ -53,10 +55,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -74,6 +73,12 @@ import java.util.concurrent.Future;
* </ul>
*/
public class ExternalSortExec extends SortExec {
+
+ enum SortAlgorithm{
+ TIM,
+ MSD_RADIX,
+ }
+
/** Class logger */
private static final Log LOG = LogFactory.getLog(ExternalSortExec.class);
/** The prefix of fragment name for intermediate */
@@ -117,6 +122,8 @@ public class ExternalSortExec extends SortExec {
/** total bytes of input data */
private long inputBytes;
+ private final SortAlgorithm sortAlgorithm;
+
private ExternalSortExec(final TaskAttemptContext context, final SortNode plan)
throws PhysicalPlanningException {
super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
@@ -133,6 +140,28 @@ public class ExternalSortExec extends SortExec {
this.localFS = new RawLocalFileSystem();
this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW);
this.inputStats = new TableStats();
+ this.sortAlgorithm = getSortAlgorithm(context.getQueryContext(), sortSpecs);
+ LOG.info(sortAlgorithm.name() + " sort is selected");
+ }
+
+ private static SortAlgorithm getSortAlgorithm(QueryContext context, SortSpec[] sortSpecs) {
+ String sortAlgorithm = context.get(SessionVars.SORT_ALGORITHM, SortAlgorithm.TIM.name());
+ if (Arrays.stream(sortSpecs)
+ .filter(sortSpec -> !RadixSort.isApplicableType(sortSpec)).count() > 0) {
+ if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.MSD_RADIX.name())) {
+ LOG.warn("Non-applicable types exist. Falling back to " + SortAlgorithm.TIM.name() + " sort");
+ }
+ return SortAlgorithm.TIM;
+ }
+ if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.TIM.name())) {
+ return SortAlgorithm.TIM;
+ } else if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.MSD_RADIX.name())) {
+ return SortAlgorithm.MSD_RADIX;
+ } else {
+ LOG.warn("Unknown sort type: " + sortAlgorithm);
+ LOG.warn("Falling back to " + SortAlgorithm.TIM.name() + " sort");
+ return SortAlgorithm.TIM;
+ }
}
public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, final ScanNode scanNode,
@@ -172,6 +201,18 @@ public class ExternalSortExec extends SortExec {
return this.plan;
}
+ private List<UnSafeTuple> sort(UnSafeTupleList tupleBlock) {
+ switch (sortAlgorithm) {
+ case TIM:
+ return OffHeapRowBlockUtils.sort(tupleBlock, unSafeComparator);
+ case MSD_RADIX:
+ return RadixSort.sort(context.getQueryContext(), tupleBlock, inSchema, sortSpecs, unSafeComparator);
+ default:
+ // The below line is not reachable. So, an exception should be thrown if it is executed.
+ throw new TajoRuntimeException(new UnsupportedException(sortAlgorithm.name()));
+ }
+ }
+
/**
* Sort a tuple block and store them into a chunk file
*/
@@ -180,7 +221,7 @@ public class ExternalSortExec extends SortExec {
int rowNum = tupleBlock.size();
long sortStart = System.currentTimeMillis();
- OffHeapRowBlockUtils.sort(tupleBlock, unSafeComparator);
+ this.sort(tupleBlock);
long sortEnd = System.currentTimeMillis();
long chunkWriteStart = System.currentTimeMillis();
@@ -527,7 +568,7 @@ public class ExternalSortExec extends SortExec {
if (chunk.isMemory()) {
long sortStart = System.currentTimeMillis();
- OffHeapRowBlockUtils.sort(inMemoryTable, unSafeComparator);
+ this.sort(inMemoryTable);
Scanner scanner = new MemTableScanner<>(inMemoryTable, inMemoryTable.size(), inMemoryTable.usedMem());
if(LOG.isDebugEnabled()) {
debug(LOG, "Memory Chunk sort (" + FileUtil.humanReadableByteCount(inMemoryTable.usedMem(), false)