You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/04 12:33:39 UTC
[40/50] [abbrv] carbondata git commit: Revert
"[CARBONDATA-2018][DataLoad] Optimization in reading/writing for sort temp
row"
Revert "[CARBONDATA-2018][DataLoad] Optimization in reading/writing for sort temp row"
This reverts commit de92ea9a123b17d903f2d1d4662299315c792954.
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/46031a32
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/46031a32
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/46031a32
Branch: refs/heads/carbonstore-rebase5
Commit: 46031a320506ceed10b2134710be6c630c6ee533
Parents: 1d85e91
Author: Jacky Li <ja...@qq.com>
Authored: Sat Feb 10 20:11:25 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sun Mar 4 20:32:13 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/util/NonDictionaryUtil.java | 67 ++-
.../presto/util/CarbonDataStoreCreator.scala | 1 +
.../load/DataLoadProcessorStepOnSpark.scala | 6 +-
.../loading/row/IntermediateSortTempRow.java | 117 -----
.../loading/sort/SortStepRowHandler.java | 466 -------------------
.../loading/sort/SortStepRowUtil.java | 103 ++++
.../sort/unsafe/UnsafeCarbonRowPage.java | 331 +++++++++++--
.../loading/sort/unsafe/UnsafeSortDataRows.java | 57 ++-
.../unsafe/comparator/UnsafeRowComparator.java | 95 ++--
.../UnsafeRowComparatorForNormalDIms.java | 59 +++
.../UnsafeRowComparatorForNormalDims.java | 59 ---
.../sort/unsafe/holder/SortTempChunkHolder.java | 3 +-
.../holder/UnsafeFinalMergePageHolder.java | 19 +-
.../unsafe/holder/UnsafeInmemoryHolder.java | 21 +-
.../holder/UnsafeSortTempFileChunkHolder.java | 138 ++++--
.../merger/UnsafeIntermediateFileMerger.java | 118 ++++-
.../UnsafeSingleThreadFinalSortFilesMerger.java | 27 +-
.../merger/CompactionResultSortProcessor.java | 1 +
.../sort/sortdata/IntermediateFileMerger.java | 95 +++-
.../IntermediateSortTempRowComparator.java | 73 ---
.../sort/sortdata/NewRowComparator.java | 5 +-
.../sortdata/NewRowComparatorForNormalDims.java | 3 +-
.../processing/sort/sortdata/RowComparator.java | 94 ++++
.../sortdata/RowComparatorForNormalDims.java | 62 +++
.../SingleThreadFinalSortFilesMerger.java | 25 +-
.../processing/sort/sortdata/SortDataRows.java | 85 +++-
.../sort/sortdata/SortTempFileChunkHolder.java | 174 +++++--
.../sort/sortdata/TableFieldStat.java | 176 -------
28 files changed, 1294 insertions(+), 1186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
index fca1244..d6ecfbc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
@@ -82,26 +82,18 @@ public class NonDictionaryUtil {
}
/**
- * Method to get the required dictionary Dimension from obj []
+ * Method to get the required Dimension from obj []
*
* @param index
* @param row
* @return
*/
- public static int getDictDimension(int index, Object[] row) {
- int[] dimensions = (int[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
+ public static Integer getDimension(int index, Object[] row) {
+
+ Integer[] dimensions = (Integer[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
+
return dimensions[index];
- }
- /**
- * Method to get the required non-dictionary & complex from 3-parted row
- * @param index
- * @param row
- * @return
- */
- public static byte[] getNoDictOrComplex(int index, Object[] row) {
- byte[][] nonDictArray = (byte[][]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
- return nonDictArray[index];
}
/**
@@ -116,11 +108,60 @@ public class NonDictionaryUtil {
return measures[index];
}
+ public static byte[] getByteArrayForNoDictionaryCols(Object[] row) {
+
+ return (byte[]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+ }
+
public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr,
Object[] measureArray) {
+
out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr;
out[WriteStepRowUtil.MEASURE] = measureArray;
}
+
+ /**
+ * This method will extract the single dimension from the complete high card dims byte[].+ *
+ * The format of the byte [] will be, Totallength,CompleteStartOffsets,Dat
+ *
+ * @param highCardArr
+ * @param index
+ * @param highCardinalityCount
+ * @param outBuffer
+ */
+ public static void extractSingleHighCardDims(byte[] highCardArr, int index,
+ int highCardinalityCount, ByteBuffer outBuffer) {
+ ByteBuffer buff = null;
+ short secIndex = 0;
+ short firstIndex = 0;
+ int length;
+ // if the requested index is a last one then we need to calculate length
+ // based on byte[] length.
+ if (index == highCardinalityCount - 1) {
+ // need to read 2 bytes(1 short) to determine starting offset and
+ // length can be calculated by array length.
+ buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 2);
+ } else {
+ // need to read 4 bytes(2 short) to determine starting offset and
+ // length.
+ buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 4);
+ }
+
+ firstIndex = buff.getShort();
+ // if it is a last dimension in high card then this will be last
+ // offset.so calculate length from total length
+ if (index == highCardinalityCount - 1) {
+ secIndex = (short) highCardArr.length;
+ } else {
+ secIndex = buff.getShort();
+ }
+
+ length = secIndex - firstIndex;
+
+ outBuffer.position(firstIndex);
+ outBuffer.limit(outBuffer.position() + length);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 1bc9812..e768660 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -383,6 +383,7 @@ object CarbonDataStoreCreator {
.getInstance.createCache(CacheType.REVERSE_DICTIONARY)
for (i <- set.indices) {
+ // val dim = getDimension(dims, i).get
val columnIdentifier: ColumnIdentifier =
new ColumnIdentifier(dims.get(i).getColumnId, null, null)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 0422239..5124247 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler
+import org.apache.carbondata.processing.loading.sort.SortStepRowUtil
import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl
import org.apache.carbondata.processing.sort.sortdata.SortParameters
import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
@@ -206,7 +206,7 @@ object DataLoadProcessorStepOnSpark {
val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val sortParameters = SortParameters.createSortParameters(conf)
- val sortStepRowHandler = new SortStepRowHandler(sortParameters)
+ val sortStepRowUtil = new SortStepRowUtil(sortParameters)
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
wrapException(e, model)
}
@@ -216,7 +216,7 @@ object DataLoadProcessorStepOnSpark {
override def next(): CarbonRow = {
val row =
- new CarbonRow(sortStepRowHandler.convertRawRowTo3Parts(rows.next().getData))
+ new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData))
rowCounter.add(1)
row
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
deleted file mode 100644
index 8d351cf..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.loading.row;
-
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/**
- * During sort procedure, each row will be written to sort temp file in this logic format.
- * an intermediate sort temp row consists 3 parts:
- * dictSort, noDictSort, noSortDimsAndMeasures(dictNoSort, noDictNoSort, measure)
- */
-public class IntermediateSortTempRow {
- private int[] dictSortDims;
- private byte[][] noDictSortDims;
- private byte[] noSortDimsAndMeasures;
-
- public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
- byte[] noSortDimsAndMeasures) {
- this.dictSortDims = dictSortDims;
- this.noDictSortDims = noDictSortDims;
- this.noSortDimsAndMeasures = noSortDimsAndMeasures;
- }
-
- public int[] getDictSortDims() {
- return dictSortDims;
- }
-
- public byte[][] getNoDictSortDims() {
- return noDictSortDims;
- }
-
- public byte[] getNoSortDimsAndMeasures() {
- return noSortDimsAndMeasures;
- }
-
- /**
- * deserialize from bytes array to get the no sort fields
- * @param outDictNoSort stores the dict & no-sort fields
- * @param outNoDictNoSort stores the no-dict & no-sort fields, including complex
- * @param outMeasures stores the measure fields
- * @param dataTypes data type for the measure
- */
- public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
- Object[] outMeasures, DataType[] dataTypes) {
- ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
- // read dict_no_sort
- int dictNoSortCnt = outDictNoSort.length;
- for (int i = 0; i < dictNoSortCnt; i++) {
- outDictNoSort[i] = rowBuffer.getInt();
- }
-
- // read no_dict_no_sort (including complex)
- int noDictNoSortCnt = outNoDictNoSort.length;
- for (int i = 0; i < noDictNoSortCnt; i++) {
- short len = rowBuffer.getShort();
- byte[] bytes = new byte[len];
- rowBuffer.get(bytes);
- outNoDictNoSort[i] = bytes;
- }
-
- // read measure
- int measureCnt = outMeasures.length;
- DataType tmpDataType;
- Object tmpContent;
- for (short idx = 0 ; idx < measureCnt; idx++) {
- if ((byte) 0 == rowBuffer.get()) {
- outMeasures[idx] = null;
- continue;
- }
-
- tmpDataType = dataTypes[idx];
- if (DataTypes.BOOLEAN == tmpDataType) {
- if ((byte) 1 == rowBuffer.get()) {
- tmpContent = true;
- } else {
- tmpContent = false;
- }
- } else if (DataTypes.SHORT == tmpDataType) {
- tmpContent = rowBuffer.getShort();
- } else if (DataTypes.INT == tmpDataType) {
- tmpContent = rowBuffer.getInt();
- } else if (DataTypes.LONG == tmpDataType) {
- tmpContent = rowBuffer.getLong();
- } else if (DataTypes.DOUBLE == tmpDataType) {
- tmpContent = rowBuffer.getDouble();
- } else if (DataTypes.isDecimal(tmpDataType)) {
- short len = rowBuffer.getShort();
- byte[] decimalBytes = new byte[len];
- rowBuffer.get(decimalBytes);
- tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
- } else {
- throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
- }
- outMeasures[idx] = tmpContent;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
deleted file mode 100644
index f31a2b9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
-
-/**
- * This class is used to convert/write/read row in sort step in carbondata.
- * It consists the following function:
- * 1. convert raw row & intermediate sort temp row to 3-parted row
- * 2. read/write intermediate sort temp row to sort temp file & unsafe memory
- * 3. write raw row directly to sort temp file & unsafe memory as intermediate sort temp row
- */
-public class SortStepRowHandler implements Serializable {
- private static final long serialVersionUID = 1L;
- private int dictSortDimCnt = 0;
- private int dictNoSortDimCnt = 0;
- private int noDictSortDimCnt = 0;
- private int noDictNoSortDimCnt = 0;
- private int measureCnt;
-
- // indices for dict & sort dimension columns
- private int[] dictSortDimIdx;
- // indices for dict & no-sort dimension columns
- private int[] dictNoSortDimIdx;
- // indices for no-dict & sort dimension columns
- private int[] noDictSortDimIdx;
- // indices for no-dict & no-sort dimension columns, including complex columns
- private int[] noDictNoSortDimIdx;
- // indices for measure columns
- private int[] measureIdx;
-
- private DataType[] dataTypes;
-
- /**
- * constructor
- * @param tableFieldStat table field stat
- */
- public SortStepRowHandler(TableFieldStat tableFieldStat) {
- this.dictSortDimCnt = tableFieldStat.getDictSortDimCnt();
- this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt();
- this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt();
- this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt();
- this.measureCnt = tableFieldStat.getMeasureCnt();
- this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx();
- this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx();
- this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx();
- this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx();
- this.measureIdx = tableFieldStat.getMeasureIdx();
- this.dataTypes = tableFieldStat.getMeasureDataType();
- }
-
- /**
- * constructor
- * @param sortParameters sort parameters
- */
- public SortStepRowHandler(SortParameters sortParameters) {
- this(new TableFieldStat(sortParameters));
- }
-
- /**
- * Convert carbon row from raw format to 3-parted format.
- * This method is used in global-sort.
- *
- * @param row raw row whose length is the same as field number
- * @return 3-parted row whose length is 3. (1 for dict dims ,1 for non-dict and complex,
- * 1 for measures)
- */
- public Object[] convertRawRowTo3Parts(Object[] row) {
- Object[] holder = new Object[3];
- try {
- int[] dictDims
- = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
- byte[][] nonDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
- Object[] measures = new Object[this.measureCnt];
-
- // convert dict & data
- int idxAcc = 0;
- for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
- dictDims[idxAcc++] = (int) row[this.dictSortDimIdx[idx]];
- }
-
- // convert dict & no-sort
- for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
- dictDims[idxAcc++] = (int) row[this.dictNoSortDimIdx[idx]];
- }
- // convert no-dict & sort
- idxAcc = 0;
- for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- nonDictArray[idxAcc++] = (byte[]) row[this.noDictSortDimIdx[idx]];
- }
- // convert no-dict & no-sort
- for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
- nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
- }
-
- // convert measure data
- for (int idx = 0; idx < this.measureCnt; idx++) {
- measures[idx] = row[this.measureIdx[idx]];
- }
-
- NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);
- } catch (Exception e) {
- throw new RuntimeException("Problem while converting row to 3 parts", e);
- }
- return holder;
- }
-
- /**
- * Convert intermediate sort temp row to 3-parted row.
- * This method is used in the final merge sort to feed rows to the next write step.
- *
- * @param sortTempRow intermediate sort temp row
- * @return 3-parted row
- */
- public Object[] convertIntermediateSortTempRowTo3Parted(IntermediateSortTempRow sortTempRow) {
- int[] dictDims
- = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
- byte[][] noDictArray
- = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
-
- int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
- byte[][] noDictNoSortDims = new byte[this.noDictNoSortDimCnt][];
- Object[] measures = new Object[this.measureCnt];
-
- sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortDims, measures, this.dataTypes);
-
- // dict dims
- System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
- 0, this.dictSortDimCnt);
- System.arraycopy(dictNoSortDims, 0, dictDims,
- this.dictSortDimCnt, this.dictNoSortDimCnt);;
-
- // no dict dims, including complex
- System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
- noDictArray, 0, this.noDictSortDimCnt);
- System.arraycopy(noDictNoSortDims, 0, noDictArray,
- this.noDictSortDimCnt, this.noDictNoSortDimCnt);
-
- // measures are already here
-
- Object[] holder = new Object[3];
- NonDictionaryUtil.prepareOutObj(holder, dictDims, noDictArray, measures);
- return holder;
- }
-
- /**
- * Read intermediate sort temp row from InputStream.
- * This method is used during the merge sort phase to read row from sort temp file.
- *
- * @param inputStream input stream
- * @return a row that contains three parts
- * @throws IOException if error occrus while reading from stream
- */
- public IntermediateSortTempRow readIntermediateSortTempRowFromInputStream(
- DataInputStream inputStream) throws IOException {
- int[] dictSortDims = new int[this.dictSortDimCnt];
- byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
-
- // read dict & sort dim data
- for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
- dictSortDims[idx] = inputStream.readInt();
- }
-
- // read no-dict & sort data
- for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- short len = inputStream.readShort();
- byte[] bytes = new byte[len];
- inputStream.readFully(bytes);
- noDictSortDims[idx] = bytes;
- }
-
- // read no-dict dims & measures
- int len = inputStream.readInt();
- byte[] noSortDimsAndMeasures = new byte[len];
- inputStream.readFully(noSortDimsAndMeasures);
-
- return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
- }
-
- /**
- * Write intermediate sort temp row to OutputStream
- * This method is used during the merge sort phase to write row to sort temp file.
- *
- * @param sortTempRow intermediate sort temp row
- * @param outputStream output stream
- * @throws IOException if error occurs while writing to stream
- */
- public void writeIntermediateSortTempRowToOutputStream(IntermediateSortTempRow sortTempRow,
- DataOutputStream outputStream) throws IOException {
- // write dict & sort dim
- for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
- outputStream.writeInt(sortTempRow.getDictSortDims()[idx]);
- }
-
- // write no-dict & sort dim
- for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- byte[] bytes = sortTempRow.getNoDictSortDims()[idx];
- outputStream.writeShort(bytes.length);
- outputStream.write(bytes);
- }
-
- // write packed no-sort dim & measure
- outputStream.writeInt(sortTempRow.getNoSortDimsAndMeasures().length);
- outputStream.write(sortTempRow.getNoSortDimsAndMeasures());
- }
-
- /**
- * Write raw row as an intermediate sort temp row to sort temp file.
- * This method is used in the beginning of the sort phase. Comparing with converting raw row to
- * intermediate sort temp row and then writing the converted one, Writing raw row directly will
- * save the intermediate trivial loss.
- * This method use an array backend buffer to save memory allocation. The buffer will be reused
- * for all rows (per thread).
- *
- * @param row raw row
- * @param outputStream output stream
- * @param rowBuffer array backend buffer
- * @throws IOException if error occurs while writing to stream
- */
- public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row,
- DataOutputStream outputStream, ByteBuffer rowBuffer) throws IOException {
- // write dict & sort
- for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
- outputStream.writeInt((int) row[this.dictSortDimIdx[idx]]);
- }
-
- // write no-dict & sort
- for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
- outputStream.writeShort(bytes.length);
- outputStream.write(bytes);
- }
-
- // pack no-sort
- rowBuffer.clear();
- packNoSortFieldsToBytes(row, rowBuffer);
- rowBuffer.flip();
- int packSize = rowBuffer.limit();
-
- // write no-sort
- outputStream.writeInt(packSize);
- outputStream.write(rowBuffer.array(), 0, packSize);
- }
-
- /**
- * Read intermediate sort temp row from unsafe memory.
- * This method is used during merge sort phase for off-heap sort.
- *
- * @param baseObject base object of memory block
- * @param address address of the row
- * @return intermediate sort temp row
- */
- public IntermediateSortTempRow readIntermediateSortTempRowFromUnsafeMemory(Object baseObject,
- long address) {
- int size = 0;
-
- int[] dictSortDims = new int[this.dictSortDimCnt];
- byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
-
- // read dict & sort dim
- for (int idx = 0; idx < dictSortDims.length; idx++) {
- dictSortDims[idx] = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
- size += 4;
- }
-
- // read no-dict & sort dim
- for (int idx = 0; idx < noDictSortDims.length; idx++) {
- short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- size += 2;
- byte[] bytes = new byte[length];
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
- bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
- size += length;
- noDictSortDims[idx] = bytes;
- }
-
- // read no-sort dims & measures
- int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
- size += 4;
- byte[] noSortDimsAndMeasures = new byte[len];
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
- noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
-
- return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
- }
-
- /**
- * Write intermediate sort temp row directly from unsafe memory to stream.
- * This method is used at the late beginning of the sort phase to write in-memory pages
- * to sort temp file. Comparing with reading intermediate sort temp row from memory and then
- * writing it, Writing directly from memory to stream will save the intermediate trivial loss.
- *
- * @param baseObject base object of the memory block
- * @param address base address of the row
- * @param outputStream output stream
- * @throws IOException if error occurs while writing to stream
- */
- public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObject,
- long address, DataOutputStream outputStream) throws IOException {
- int size = 0;
-
- // dict & sort
- for (int idx = 0; idx < dictSortDimCnt; idx++) {
- outputStream.writeInt(CarbonUnsafe.getUnsafe().getInt(baseObject, address + size));
- size += 4;
- }
-
- // no-dict & sort
- for (int idx = 0; idx < noDictSortDimCnt; idx++) {
- short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
- size += 2;
- byte[] bytes = new byte[length];
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
- bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
- size += length;
-
- outputStream.writeShort(length);
- outputStream.write(bytes);
- }
-
- // packed no-sort & measure
- int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
- size += 4;
- byte[] noSortDimsAndMeasures = new byte[len];
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
- noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
- size += len;
-
- outputStream.writeInt(len);
- outputStream.write(noSortDimsAndMeasures);
- }
-
- /**
- * Write raw row as an intermediate sort temp row to memory.
- * This method is used in the beginning of the off-heap sort phase. Comparing with converting
- * raw row to intermediate sort temp row and then writing the converted one,
- * Writing raw row directly will save the intermediate trivial loss.
- * This method use an array backend buffer to save memory allocation. The buffer will be reused
- * for all rows (per thread).
- *
- * @param row raw row
- * @param baseObject base object of the memory block
- * @param address base address for the row
- * @param rowBuffer array backend buffer
- * @return number of bytes written to memory
- */
- public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row,
- Object baseObject, long address, ByteBuffer rowBuffer) {
- int size = 0;
- // write dict & sort
- for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
- CarbonUnsafe.getUnsafe()
- .putInt(baseObject, address + size, (int) row[this.dictSortDimIdx[idx]]);
- size += 4;
- }
-
- // write no-dict & sort
- for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
- CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
- size += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
- bytes.length);
- size += bytes.length;
- }
-
- // convert pack no-sort
- rowBuffer.clear();
- packNoSortFieldsToBytes(row, rowBuffer);
- rowBuffer.flip();
- int packSize = rowBuffer.limit();
-
- // write no-sort
- CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize);
- size += 4;
- CarbonUnsafe.getUnsafe()
- .copyMemory(rowBuffer.array(), CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
- packSize);
- size += packSize;
- return size;
- }
-
- /**
- * Pack to no-sort fields to byte array
- *
- * @param row raw row
- * @param rowBuffer byte array backend buffer
- */
- private void packNoSortFieldsToBytes(Object[] row, ByteBuffer rowBuffer) {
- // convert dict & no-sort
- for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
- rowBuffer.putInt((int) row[this.dictNoSortDimIdx[idx]]);
- }
- // convert no-dict & no-sort
- for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
- byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
- rowBuffer.putShort((short) bytes.length);
- rowBuffer.put(bytes);
- }
-
- // convert measure
- Object tmpValue;
- DataType tmpDataType;
- for (int idx = 0; idx < this.measureCnt; idx++) {
- tmpValue = row[this.measureIdx[idx]];
- tmpDataType = this.dataTypes[idx];
- if (null == tmpValue) {
- rowBuffer.put((byte) 0);
- continue;
- }
- rowBuffer.put((byte) 1);
- if (DataTypes.BOOLEAN == tmpDataType) {
- if ((boolean) tmpValue) {
- rowBuffer.put((byte) 1);
- } else {
- rowBuffer.put((byte) 0);
- }
- } else if (DataTypes.SHORT == tmpDataType) {
- rowBuffer.putShort((Short) tmpValue);
- } else if (DataTypes.INT == tmpDataType) {
- rowBuffer.putInt((Integer) tmpValue);
- } else if (DataTypes.LONG == tmpDataType) {
- rowBuffer.putLong((Long) tmpValue);
- } else if (DataTypes.DOUBLE == tmpDataType) {
- rowBuffer.putDouble((Double) tmpValue);
- } else if (DataTypes.isDecimal(tmpDataType)) {
- byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
- rowBuffer.putShort((short) decimalBytes.length);
- rowBuffer.put(decimalBytes);
- } else {
- throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
new file mode 100644
index 0000000..c4e4756
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+public class SortStepRowUtil {
+ private int measureCount;
+ private int dimensionCount;
+ private int complexDimensionCount;
+ private int noDictionaryCount;
+ private int[] dictDimIdx;
+ private int[] nonDictIdx;
+ private int[] measureIdx;
+
+ public SortStepRowUtil(SortParameters parameters) {
+ this.measureCount = parameters.getMeasureColCount();
+ this.dimensionCount = parameters.getDimColCount();
+ this.complexDimensionCount = parameters.getComplexDimColCount();
+ this.noDictionaryCount = parameters.getNoDictionaryCount();
+ boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+
+ int index = 0;
+ int nonDicIndex = 0;
+ int allCount = 0;
+
+ // be careful that the default value is 0
+ this.dictDimIdx = new int[dimensionCount - noDictionaryCount];
+ this.nonDictIdx = new int[noDictionaryCount + complexDimensionCount];
+ this.measureIdx = new int[measureCount];
+
+ // indices for dict dim columns
+ for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+ if (isNoDictionaryDimensionColumn[i]) {
+ nonDictIdx[nonDicIndex++] = i;
+ } else {
+ dictDimIdx[index++] = allCount;
+ }
+ allCount++;
+ }
+
+ // indices for non dict dim/complex columns
+ for (int i = 0; i < complexDimensionCount; i++) {
+ nonDictIdx[nonDicIndex++] = allCount;
+ allCount++;
+ }
+
+ // indices for measure columns
+ for (int i = 0; i < measureCount; i++) {
+ measureIdx[i] = allCount;
+ allCount++;
+ }
+ }
+
+ public Object[] convertRow(Object[] data) {
+ // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+ Object[] holder = new Object[3];
+ try {
+
+ int[] dictDims = new int[dimensionCount - noDictionaryCount];
+ byte[][] nonDictArray = new byte[noDictionaryCount + complexDimensionCount][];
+ Object[] measures = new Object[measureCount];
+
+ // write dict dim data
+ for (int idx = 0; idx < dictDimIdx.length; idx++) {
+ dictDims[idx] = (int) data[dictDimIdx[idx]];
+ }
+
+ // write non dict dim data
+ for (int idx = 0; idx < nonDictIdx.length; idx++) {
+ nonDictArray[idx] = (byte[]) data[nonDictIdx[idx]];
+ }
+
+ // write measure data
+ for (int idx = 0; idx < measureIdx.length; idx++) {
+ measures[idx] = data[measureIdx[idx]];
+ }
+ NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);
+
+ // increment number if record read
+ } catch (Exception e) {
+ throw new RuntimeException("Problem while converting row ", e);
+ }
+ //return out row
+ return holder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 7ea5cb3..e5583c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -19,20 +19,35 @@ package org.apache.carbondata.processing.loading.sort.unsafe;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.IntPointerBuffer;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
/**
* It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
*/
public class UnsafeCarbonRowPage {
+
+ private boolean[] noDictionaryDimensionMapping;
+
+ private boolean[] noDictionarySortColumnMapping;
+
+ private int dimensionSize;
+
+ private int measureSize;
+
+ private DataType[] measureDataType;
+
+ private long[] nullSetWords;
+
private IntPointerBuffer buffer;
private int lastSize;
@@ -47,14 +62,16 @@ public class UnsafeCarbonRowPage {
private long taskId;
- private TableFieldStat tableFieldStat;
- private SortStepRowHandler sortStepRowHandler;
-
- public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
- boolean saveToDisk, long taskId) {
- this.tableFieldStat = tableFieldStat;
- this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+ public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+ boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
+ MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
+ this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+ this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
+ this.dimensionSize = dimensionSize;
+ this.measureSize = measureSize;
+ this.measureDataType = type;
this.saveToDisk = saveToDisk;
+ this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
this.taskId = taskId;
buffer = new IntPointerBuffer(this.taskId);
this.dataBlock = memoryBlock;
@@ -63,44 +80,255 @@ public class UnsafeCarbonRowPage {
this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
}
- public int addRow(Object[] row, ByteBuffer rowBuffer) {
- int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer);
+ public int addRow(Object[] row) {
+ int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
buffer.set(lastSize);
lastSize = lastSize + size;
return size;
}
- /**
- * add raw row as intermidiate sort temp row to page
- *
- * @param row
- * @param address
- * @return
- */
- private int addRow(Object[] row, long address, ByteBuffer rowBuffer) {
- return sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row,
- dataBlock.getBaseObject(), address, rowBuffer);
+ private int addRow(Object[] row, long address) {
+ if (row == null) {
+ throw new RuntimeException("Row is null ??");
+ }
+ int dimCount = 0;
+ int size = 0;
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ byte[] col = (byte[]) row[dimCount];
+ CarbonUnsafe.getUnsafe()
+ .putShort(baseObject, address + size, (short) col.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, col.length);
+ size += col.length;
+ } else {
+ int value = (int) row[dimCount];
+ CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
+ size += 4;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ byte[] col = (byte[]) row[dimCount];
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, col.length);
+ size += col.length;
+ }
+ Arrays.fill(nullSetWords, 0);
+ int nullSetSize = nullSetWords.length * 8;
+ int nullWordLoc = size;
+ size += nullSetSize;
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ Object value = row[mesCount + dimensionSize];
+ if (null != value) {
+ DataType dataType = measureDataType[mesCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ Boolean bval = (Boolean) value;
+ CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, bval);
+ size += 1;
+ } else if (dataType == DataTypes.SHORT) {
+ Short sval = (Short) value;
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
+ size += 2;
+ } else if (dataType == DataTypes.INT) {
+ Integer ival = (Integer) value;
+ CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
+ size += 4;
+ } else if (dataType == DataTypes.LONG) {
+ Long val = (Long) value;
+ CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
+ size += 8;
+ } else if (dataType == DataTypes.DOUBLE) {
+ Double doubleVal = (Double) value;
+ CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
+ size += 8;
+ } else if (DataTypes.isDecimal(dataType)) {
+ BigDecimal decimalVal = (BigDecimal) value;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+ CarbonUnsafe.getUnsafe()
+ .putShort(baseObject, address + size, (short) bigDecimalInBytes.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ } else {
+ throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
+ }
+ set(nullSetWords, mesCount);
+ } else {
+ unset(nullSetWords, mesCount);
+ }
+ }
+ CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
+ address + nullWordLoc, nullSetSize);
+ return size;
}
- /**
- * get one row from memory address
- * @param address address
- * @return one row
- */
- public IntermediateSortTempRow getRow(long address) {
- return sortStepRowHandler.readIntermediateSortTempRowFromUnsafeMemory(
- dataBlock.getBaseObject(), address);
+ public Object[] getRow(long address, Object[] rowToFill) {
+ int dimCount = 0;
+ int size = 0;
+
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ col.length);
+ size += col.length;
+ rowToFill[dimCount] = col;
+ } else {
+ int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ rowToFill[dimCount] = anInt;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+ size += col.length;
+ rowToFill[dimCount] = col;
+ }
+
+ int nullSetSize = nullSetWords.length * 8;
+ Arrays.fill(nullSetWords, 0);
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+ nullSetSize);
+ size += nullSetSize;
+
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ if (isSet(nullSetWords, mesCount)) {
+ DataType dataType = measureDataType[mesCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ Boolean bval = CarbonUnsafe.getUnsafe().getBoolean(baseObject, address + size);
+ size += 1;
+ rowToFill[dimensionSize + mesCount] = bval;
+ } else if (dataType == DataTypes.SHORT) {
+ Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ size += 2;
+ rowToFill[dimensionSize + mesCount] = sval;
+ } else if (dataType == DataTypes.INT) {
+ Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ rowToFill[dimensionSize + mesCount] = ival;
+ } else if (dataType == DataTypes.LONG) {
+ Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = val;
+ } else if (dataType == DataTypes.DOUBLE) {
+ Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = doubleVal;
+ } else if (DataTypes.isDecimal(dataType)) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+ } else {
+ throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
+ }
+ } else {
+ rowToFill[dimensionSize + mesCount] = null;
+ }
+ }
+ return rowToFill;
}
- /**
- * write a row to stream
- * @param address address of a row
- * @param stream stream
- * @throws IOException
- */
- public void writeRow(long address, DataOutputStream stream) throws IOException {
- sortStepRowHandler.writeIntermediateSortTempRowFromUnsafeMemoryToStream(
- dataBlock.getBaseObject(), address, stream);
+ public void fillRow(long address, DataOutputStream stream) throws IOException {
+ int dimCount = 0;
+ int size = 0;
+
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ col.length);
+ size += col.length;
+ stream.writeShort(aShort);
+ stream.write(col);
+ } else {
+ int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ stream.writeInt(anInt);
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+ size += col.length;
+ stream.writeShort(aShort);
+ stream.write(col);
+ }
+
+ int nullSetSize = nullSetWords.length * 8;
+ Arrays.fill(nullSetWords, 0);
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+ nullSetSize);
+ size += nullSetSize;
+ for (int i = 0; i < nullSetWords.length; i++) {
+ stream.writeLong(nullSetWords[i]);
+ }
+
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ if (isSet(nullSetWords, mesCount)) {
+ DataType dataType = measureDataType[mesCount];
+ if (dataType == DataTypes.SHORT) {
+ short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ size += 2;
+ stream.writeShort(sval);
+ } else if (dataType == DataTypes.INT) {
+ int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ stream.writeInt(ival);
+ } else if (dataType == DataTypes.LONG) {
+ long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+ size += 8;
+ stream.writeLong(val);
+ } else if (dataType == DataTypes.DOUBLE) {
+ double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+ size += 8;
+ stream.writeDouble(doubleVal);
+ } else if (DataTypes.isDecimal(dataType)) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ stream.writeShort(aShort);
+ stream.write(bigDecimalInBytes);
+ } else {
+ throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
+ }
+ }
+ }
}
public void freeMemory() {
@@ -134,8 +362,27 @@ public class UnsafeCarbonRowPage {
return dataBlock;
}
- public TableFieldStat getTableFieldStat() {
- return tableFieldStat;
+ public static void set(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ words[wordOffset] |= (1L << index);
+ }
+
+ public static void unset(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ words[wordOffset] &= ~(1L << index);
+ }
+
+ public static boolean isSet(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ return ((words[wordOffset] & (1L << index)) != 0);
+ }
+
+ public boolean[] getNoDictionaryDimensionMapping() {
+ return noDictionaryDimensionMapping;
+ }
+
+ public boolean[] getNoDictionarySortColumnMapping() {
+ return noDictionarySortColumnMapping;
}
public void setNewDataBlock(MemoryBlock newMemoryBlock) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 5d038d3..4dd5e44 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.loading.sort.unsafe;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -42,14 +41,13 @@ import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
-import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
import org.apache.carbondata.processing.loading.sort.unsafe.sort.TimSort;
import org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDataFormat;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
public class UnsafeSortDataRows {
@@ -71,8 +69,7 @@ public class UnsafeSortDataRows {
*/
private SortParameters parameters;
- private TableFieldStat tableFieldStat;
- private ThreadLocal<ByteBuffer> rowBuffer;
+
private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
private UnsafeCarbonRowPage rowPage;
@@ -97,13 +94,7 @@ public class UnsafeSortDataRows {
public UnsafeSortDataRows(SortParameters parameters,
UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
this.parameters = parameters;
- this.tableFieldStat = new TableFieldStat(parameters);
- this.rowBuffer = new ThreadLocal<ByteBuffer>() {
- @Override protected ByteBuffer initialValue() {
- byte[] backedArray = new byte[2 * 1024 * 1024];
- return ByteBuffer.wrap(backedArray);
- }
- };
+
this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
// observer of writing file in thread
@@ -136,7 +127,11 @@ public class UnsafeSortDataRows {
if (isMemoryAvailable) {
UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
}
- this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+ this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount() + parameters.getComplexDimColCount(),
+ parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
+ !isMemoryAvailable, taskId);
// Delete if any older file exists in sort temp folder
deleteSortLocationIfExists();
@@ -183,7 +178,7 @@ public class UnsafeSortDataRows {
private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
for (int i = 0; i < size; i++) {
if (rowPage.canAdd()) {
- bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
+ bytesAdded += rowPage.addRow(rowBatch[i]);
} else {
try {
if (enableInMemoryIntermediateMerge) {
@@ -199,8 +194,15 @@ public class UnsafeSortDataRows {
if (!saveToDisk) {
UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
}
- rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
- bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
+ rowPage = new UnsafeCarbonRowPage(
+ parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount() + parameters.getComplexDimColCount(),
+ parameters.getMeasureColCount(),
+ parameters.getMeasureDataType(),
+ memoryBlock,
+ saveToDisk, taskId);
+ bytesAdded += rowPage.addRow(rowBatch[i]);
} catch (Exception e) {
LOGGER.error(
"exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -218,7 +220,7 @@ public class UnsafeSortDataRows {
// if record holder list size is equal to sort buffer size then it will
// sort the list and then write current list data to file
if (rowPage.canAdd()) {
- rowPage.addRow(row, rowBuffer.get());
+ rowPage.addRow(row);
} else {
try {
if (enableInMemoryIntermediateMerge) {
@@ -233,8 +235,13 @@ public class UnsafeSortDataRows {
if (!saveToDisk) {
UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
}
- rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
- rowPage.addRow(row, rowBuffer.get());
+ rowPage = new UnsafeCarbonRowPage(
+ parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount(), parameters.getMeasureColCount(),
+ parameters.getMeasureDataType(), memoryBlock,
+ saveToDisk, taskId);
+ rowPage.addRow(row);
} catch (Exception e) {
LOGGER.error(
"exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -262,7 +269,7 @@ public class UnsafeSortDataRows {
new UnsafeRowComparator(rowPage));
} else {
timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
- new UnsafeRowComparatorForNormalDims(rowPage));
+ new UnsafeRowComparatorForNormalDIms(rowPage));
}
unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
} else {
@@ -288,9 +295,10 @@ public class UnsafeSortDataRows {
// write number of entries to the file
stream.writeInt(actualSize);
for (int i = 0; i < actualSize; i++) {
- rowPage.writeRow(
- rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(), stream);
+ rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(),
+ stream);
}
+
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
} finally {
@@ -359,7 +367,7 @@ public class UnsafeSortDataRows {
new UnsafeRowComparator(page));
} else {
timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
- new UnsafeRowComparatorForNormalDims(page));
+ new UnsafeRowComparatorForNormalDIms(page));
}
if (page.isSaveToDisk()) {
// create a new file every time
@@ -372,8 +380,7 @@ public class UnsafeSortDataRows {
writeDataToFile(page, sortTempFile);
LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
+ " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
- + sortTempFile + ", sort temp file size in MB is "
- + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
+ + sortTempFile);
page.freeMemory();
// add sort temp filename to and arrayList. When the list size reaches 20 then
// intermediate merging of sort temp files will be triggered
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
index 33342dc..d02be9b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -23,25 +23,63 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
+
+ /**
+ * mapping of dictionary and no dictionary of sort_columns.
+ */
+ private boolean[] noDictionarySortColumnMaping;
+
private Object baseObject;
- private TableFieldStat tableFieldStat;
- private int dictSizeInMemory;
public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
+ this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
this.baseObject = rowPage.getDataBlock().getBaseObject();
- this.tableFieldStat = rowPage.getTableFieldStat();
- this.dictSizeInMemory = (tableFieldStat.getDictSortDimCnt()
- + tableFieldStat.getDictNoSortDimCnt()) * 4;
}
/**
* Below method will be used to compare two mdkey
*/
public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
- return compare(rowL, baseObject, rowR, baseObject);
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+ if (isNoDictionary) {
+ short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + sizeA);
+ byte[] byteArr1 = new byte[aShort1];
+ sizeA += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
+ sizeA += aShort1;
+
+ short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + sizeB);
+ byte[] byteArr2 = new byte[aShort2];
+ sizeB += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
+ sizeB += aShort2;
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ }
+
+ return diff;
}
/**
@@ -52,40 +90,35 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
int diff = 0;
long rowA = rowL.address;
long rowB = rowR.address;
- int sizeInDictPartA = 0;
-
- int sizeInNonDictPartA = 0;
- int sizeInDictPartB = 0;
- int sizeInNonDictPartB = 0;
- for (boolean isNoDictionary : tableFieldStat.getIsSortColNoDictFlags()) {
+ int sizeA = 0;
+ int sizeB = 0;
+ for (boolean isNoDictionary : noDictionarySortColumnMaping) {
if (isNoDictionary) {
- short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL,
- rowA + dictSizeInMemory + sizeInNonDictPartA);
- byte[] byteArr1 = new byte[lengthA];
- sizeInNonDictPartA += 2;
+ short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + sizeA);
+ byte[] byteArr1 = new byte[aShort1];
+ sizeA += 2;
CarbonUnsafe.getUnsafe()
- .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA,
- byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
- sizeInNonDictPartA += lengthA;
+ .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ aShort1);
+ sizeA += aShort1;
- short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR,
- rowB + dictSizeInMemory + sizeInNonDictPartB);
- byte[] byteArr2 = new byte[lengthB];
- sizeInNonDictPartB += 2;
+ short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + sizeB);
+ byte[] byteArr2 = new byte[aShort2];
+ sizeB += 2;
CarbonUnsafe.getUnsafe()
- .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB,
- byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
- sizeInNonDictPartB += lengthB;
+ .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ aShort2);
+ sizeB += aShort2;
int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
if (difference != 0) {
return difference;
}
} else {
- int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeInDictPartA);
- sizeInDictPartA += 4;
- int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeInDictPartB);
- sizeInDictPartB += 4;
+ int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeB);
+ sizeB += 4;
diff = dimFieldA - dimFieldB;
if (diff != 0) {
return diff;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
new file mode 100644
index 0000000..483dcb2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> {
+
+ private Object baseObject;
+
+ private int numberOfSortColumns;
+
+ public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
+ this.baseObject = rowPage.getDataBlock().getBaseObject();
+ this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (int i = 0; i < numberOfSortColumns; i++) {
+ int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
deleted file mode 100644
index e9cfb1c..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
-
-public class UnsafeRowComparatorForNormalDims implements Comparator<UnsafeCarbonRow> {
-
- private Object baseObject;
-
- private int numberOfSortColumns;
-
- public UnsafeRowComparatorForNormalDims(UnsafeCarbonRowPage rowPage) {
- this.baseObject = rowPage.getDataBlock().getBaseObject();
- this.numberOfSortColumns = rowPage.getTableFieldStat().getIsSortColNoDictFlags().length;
- }
-
- /**
- * Below method will be used to compare two mdkey
- */
- public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
- int diff = 0;
- long rowA = rowL.address;
- long rowB = rowR.address;
- int sizeA = 0;
- int sizeB = 0;
- for (int i = 0; i < numberOfSortColumns; i++) {
- int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
- sizeA += 4;
- int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
- sizeB += 4;
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
-
- return diff;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
index d790c41..686e855 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
@@ -17,7 +17,6 @@
package org.apache.carbondata.processing.loading.sort.unsafe.holder;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
/**
@@ -29,7 +28,7 @@ public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> {
void readRow() throws CarbonSortKeyAndGroupByException;
- IntermediateSortTempRow getRow();
+ Object[] getRow();
int numberOfRows();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index a776db1..6b0cfa6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -19,10 +19,9 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
-import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
@@ -39,18 +38,21 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
private UnsafeCarbonRowPage[] rowPages;
- private IntermediateSortTempRowComparator comparator;
+ private NewRowComparator comparator;
- private IntermediateSortTempRow currentRow;
+ private Object[] currentRow;
+
+ private int columnSize;
public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
- boolean[] noDictSortColumnMapping) {
+ boolean[] noDictSortColumnMapping, int columnSize) {
this.actualSize = merger.getEntryCount();
this.mergedAddresses = merger.getMergedAddresses();
this.rowPageIndexes = merger.getRowPageIndexes();
this.rowPages = merger.getUnsafeCarbonRowPages();
LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
- this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping);
+ this.comparator = new NewRowComparator(noDictSortColumnMapping);
+ this.columnSize = columnSize;
}
public boolean hasNext() {
@@ -61,11 +63,12 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
}
public void readRow() {
- currentRow = rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter]);
+ currentRow = new Object[columnSize];
+ rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow);
counter++;
}
- public IntermediateSortTempRow getRow() {
+ public Object[] getRow() {
return currentRow;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/46031a32/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index cbcbbae..6f05088 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -19,9 +19,8 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
public class UnsafeInmemoryHolder implements SortTempChunkHolder {
@@ -34,18 +33,21 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
private UnsafeCarbonRowPage rowPage;
- private IntermediateSortTempRow currentRow;
+ private Object[] currentRow;
private long address;
- private IntermediateSortTempRowComparator comparator;
+ private NewRowComparator comparator;
- public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage) {
+ private int columnSize;
+
+ public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
+ int numberOfSortColumns) {
this.actualSize = rowPage.getBuffer().getActualSize();
this.rowPage = rowPage;
LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
- this.comparator = new IntermediateSortTempRowComparator(
- rowPage.getTableFieldStat().getIsSortColNoDictFlags());
+ this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
+ this.columnSize = columnSize;
}
public boolean hasNext() {
@@ -56,12 +58,13 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
}
public void readRow() {
+ currentRow = new Object[columnSize];
address = rowPage.getBuffer().get(counter);
- currentRow = rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset());
+ rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow);
counter++;
}
- public IntermediateSortTempRow getRow() {
+ public Object[] getRow() {
return currentRow;
}