You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/08/07 13:10:10 UTC
[46/50] [abbrv] carbondata git commit: [CARBONDATA-2836]Fixed data
loading performance issue
[CARBONDATA-2836]Fixed data loading performance issue
Problem: Data Loading is taking more time when number of records are high(3.5 billion) records
Root Cause: In case of Final merge sort temp row conversion is done in main thread because of this final step processing became slower.
Solution: Move conversion logic to pre-fetch thread for parallel processing. This only done for single merge, intermediate merge no need to convert no sort columns.
This closes #2611
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f27efb3e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f27efb3e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f27efb3e
Branch: refs/heads/external-format
Commit: f27efb3e3757619ebdd822da8b6ab9737602de7b
Parents: b9e5106
Author: kumarvishal09 <ku...@gmail.com>
Authored: Mon Aug 6 19:00:27 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Aug 7 17:02:07 2018 +0530
----------------------------------------------------------------------
.../loading/row/IntermediateSortTempRow.java | 109 ++---------
.../loading/sort/SortStepRowHandler.java | 194 ++++++++++++++++---
.../sort/unsafe/UnsafeCarbonRowPage.java | 14 +-
.../holder/UnsafeFinalMergePageHolder.java | 3 +
.../unsafe/holder/UnsafeInmemoryHolder.java | 1 +
.../holder/UnsafeSortTempFileChunkHolder.java | 19 +-
.../merger/UnsafeIntermediateFileMerger.java | 3 +-
.../UnsafeSingleThreadFinalSortFilesMerger.java | 5 +-
.../sort/sortdata/IntermediateFileMerger.java | 3 +-
.../SingleThreadFinalSortFilesMerger.java | 2 +-
.../sort/sortdata/SortTempFileChunkHolder.java | 19 +-
11 files changed, 234 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 47b419e..1ad7879 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -16,12 +16,6 @@
*/
package org.apache.carbondata.processing.loading.row;
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
/**
* During sort procedure, each row will be written to sort temp file in this logic format.
* an intermediate sort temp row consists 3 parts:
@@ -30,7 +24,16 @@ import org.apache.carbondata.core.util.DataTypeUtil;
public class IntermediateSortTempRow {
private int[] dictSortDims;
private byte[][] noDictSortDims;
+ /**
+ * this will be used for intermediate merger when
+ * no sort field and measure field will not be
+ * used for sorting
+ */
private byte[] noSortDimsAndMeasures;
+ /**
+ * for final merger keep the measures
+ */
+ private Object[] measures;
public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
byte[] noSortDimsAndMeasures) {
@@ -39,10 +42,21 @@ public class IntermediateSortTempRow {
this.noSortDimsAndMeasures = noSortDimsAndMeasures;
}
+ public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
+ Object[] measures) {
+ this.dictSortDims = dictSortDims;
+ this.noDictSortDims = noDictSortDims;
+ this.measures = measures;
+ }
+
public int[] getDictSortDims() {
return dictSortDims;
}
+ public Object[] getMeasures() {
+ return measures;
+ }
+
public byte[][] getNoDictSortDims() {
return noDictSortDims;
}
@@ -50,87 +64,4 @@ public class IntermediateSortTempRow {
public byte[] getNoSortDimsAndMeasures() {
return noSortDimsAndMeasures;
}
-
- /**
- * deserialize from bytes array to get the no sort fields
- * @param outDictNoSort stores the dict & no-sort fields
- * @param outNoDictNoSort stores all no-dict & no-sort fields,
- * including complex and varchar fields
- * @param outMeasures stores the measure fields
- * @param dataTypes data type for the measure
- * @param varcharDimCnt number of varchar column
- * @param complexDimCnt number of complex column
- */
- public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
- Object[] outMeasures, DataType[] dataTypes, int varcharDimCnt, int complexDimCnt) {
- ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
- // read dict_no_sort
- int dictNoSortCnt = outDictNoSort.length;
- for (int i = 0; i < dictNoSortCnt; i++) {
- outDictNoSort[i] = rowBuffer.getInt();
- }
-
- // read no_dict_no_sort
- int noDictNoSortCnt = outNoDictNoSort.length - varcharDimCnt - complexDimCnt;
- for (int i = 0; i < noDictNoSortCnt; i++) {
- short len = rowBuffer.getShort();
- byte[] bytes = new byte[len];
- rowBuffer.get(bytes);
- outNoDictNoSort[i] = bytes;
- }
-
- // read varchar dims
- for (int i = 0; i < varcharDimCnt; i++) {
- int len = rowBuffer.getInt();
- byte[] bytes = new byte[len];
- rowBuffer.get(bytes);
- outNoDictNoSort[i + noDictNoSortCnt] = bytes;
- }
-
- // read complex dims
- for (int i = 0; i < complexDimCnt; i++) {
- short len = rowBuffer.getShort();
- byte[] bytes = new byte[len];
- rowBuffer.get(bytes);
- outNoDictNoSort[i + noDictNoSortCnt + varcharDimCnt] = bytes;
- }
-
- // read measure
- int measureCnt = outMeasures.length;
- DataType tmpDataType;
- Object tmpContent;
- for (short idx = 0 ; idx < measureCnt; idx++) {
- if ((byte) 0 == rowBuffer.get()) {
- outMeasures[idx] = null;
- continue;
- }
-
- tmpDataType = dataTypes[idx];
- if (DataTypes.BOOLEAN == tmpDataType) {
- if ((byte) 1 == rowBuffer.get()) {
- tmpContent = true;
- } else {
- tmpContent = false;
- }
- } else if (DataTypes.SHORT == tmpDataType) {
- tmpContent = rowBuffer.getShort();
- } else if (DataTypes.INT == tmpDataType) {
- tmpContent = rowBuffer.getInt();
- } else if (DataTypes.LONG == tmpDataType) {
- tmpContent = rowBuffer.getLong();
- } else if (DataTypes.DOUBLE == tmpDataType) {
- tmpContent = rowBuffer.getDouble();
- } else if (DataTypes.isDecimal(tmpDataType)) {
- short len = rowBuffer.getShort();
- byte[] decimalBytes = new byte[len];
- rowBuffer.get(decimalBytes);
- tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
- } else {
- throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
- }
- outMeasures[idx] = tmpContent;
- }
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index 0118e4d..f6fc3ca 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -160,49 +160,62 @@ public class SortStepRowHandler implements Serializable {
* @return 3-parted row
*/
public Object[] convertIntermediateSortTempRowTo3Parted(IntermediateSortTempRow sortTempRow) {
- int[] dictDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
- byte[][] noDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt
- + this.varcharDimCnt + this.complexDimCnt][];
-
- int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
- byte[][] noDictNoSortAndVarcharComplexDims
- = new byte[this.noDictNoSortDimCnt + this.varcharDimCnt + this.complexDimCnt][];
- Object[] measures = new Object[this.measureCnt];
-
- sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortAndVarcharComplexDims, measures,
- this.dataTypes, this.varcharDimCnt, this.complexDimCnt);
+ Object[] out = new Object[3];
+ NonDictionaryUtil
+ .prepareOutObj(out, sortTempRow.getDictSortDims(), sortTempRow.getNoDictSortDims(),
+ sortTempRow.getMeasures());
+ return out;
+ }
- // dict dims
- System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
- 0, this.dictSortDimCnt);
- System.arraycopy(dictNoSortDims, 0, dictDims,
- this.dictSortDimCnt, this.dictNoSortDimCnt);;
+ /**
+ * Read intermediate sort temp row from InputStream.
+ * This method is used during the intermediate merge sort phase to read row from sort temp file.
+ *
+ * @param inputStream input stream
+ * @return a row that contains three parts
+ * @throws IOException if error occrus while reading from stream
+ */
+ public IntermediateSortTempRow readWithoutNoSortFieldConvert(
+ DataInputStream inputStream) throws IOException {
+ int[] dictSortDims = new int[this.dictSortDimCnt];
+ byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
- // no dict dims, including complex
- System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
- noDictArray, 0, this.noDictSortDimCnt);
- System.arraycopy(noDictNoSortAndVarcharComplexDims, 0, noDictArray,
- this.noDictSortDimCnt, this.noDictNoSortDimCnt + this.varcharDimCnt + this.complexDimCnt);
+ // read dict & sort dim data
+ for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
+ dictSortDims[idx] = inputStream.readInt();
+ }
- // measures are already here
+ // read no-dict & sort data
+ for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
+ short len = inputStream.readShort();
+ byte[] bytes = new byte[len];
+ inputStream.readFully(bytes);
+ noDictSortDims[idx] = bytes;
+ }
- Object[] holder = new Object[3];
- NonDictionaryUtil.prepareOutObj(holder, dictDims, noDictArray, measures);
- return holder;
+ // read no-dict dims & measures
+ int len = inputStream.readInt();
+ byte[] noSortDimsAndMeasures = new byte[len];
+ inputStream.readFully(noSortDimsAndMeasures);
+ // keeping no sort fields and measure in pack byte array as it will not participate in sort
+ return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
}
/**
* Read intermediate sort temp row from InputStream.
- * This method is used during the merge sort phase to read row from sort temp file.
+ * This method is used during the final merge sort phase to read row from sort temp file and
+ * merged sort temp file.
*
* @param inputStream input stream
* @return a row that contains three parts
* @throws IOException if error occrus while reading from stream
*/
- public IntermediateSortTempRow readIntermediateSortTempRowFromInputStream(
+ public IntermediateSortTempRow readWithNoSortFieldConvert(
DataInputStream inputStream) throws IOException {
- int[] dictSortDims = new int[this.dictSortDimCnt];
- byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
+ int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
+ byte[][] noDictSortDims =
+ new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+ + this.complexDimCnt][];
// read dict & sort dim data
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
@@ -221,8 +234,80 @@ public class SortStepRowHandler implements Serializable {
int len = inputStream.readInt();
byte[] noSortDimsAndMeasures = new byte[len];
inputStream.readFully(noSortDimsAndMeasures);
+ Object[] measure = new Object[this.measureCnt];
+ // unpack the no sort fields and measure fields
+ unpackNoSortFromBytes(noSortDimsAndMeasures, dictSortDims, noDictSortDims, measure);
+ return new IntermediateSortTempRow(dictSortDims, noDictSortDims,measure);
+ }
- return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
+ private void unpackNoSortFromBytes(byte[] noSortDimsAndMeasures, int[] dictDims,
+ byte[][] noDictDims, Object[] measures) {
+ ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
+ // read dict_no_sort
+ for (int i = dictSortDimCnt; i < dictDims.length; i++) {
+ dictDims[i] = rowBuffer.getInt();
+ }
+
+ int noDictIndex = noDictSortDimCnt;
+ // read no_dict_no_sort
+ for (int i = 0; i < noDictNoSortDimCnt; i++) {
+ short len = rowBuffer.getShort();
+ byte[] bytes = new byte[len];
+ rowBuffer.get(bytes);
+ noDictDims[noDictIndex++] = bytes;
+ }
+
+ // read varchar dims
+ for (int i = 0; i < varcharDimCnt; i++) {
+ int len = rowBuffer.getInt();
+ byte[] bytes = new byte[len];
+ rowBuffer.get(bytes);
+ noDictDims[noDictIndex++] = bytes;
+ }
+
+ // read complex dims
+ for (int i = 0; i < complexDimCnt; i++) {
+ short len = rowBuffer.getShort();
+ byte[] bytes = new byte[len];
+ rowBuffer.get(bytes);
+ noDictDims[noDictIndex++] = bytes;
+ }
+
+ // read measure
+ int measureCnt = measures.length;
+ DataType tmpDataType;
+ Object tmpContent;
+ for (short idx = 0 ; idx < measureCnt; idx++) {
+ if ((byte) 0 == rowBuffer.get()) {
+ measures[idx] = null;
+ continue;
+ }
+
+ tmpDataType = dataTypes[idx];
+ if (DataTypes.BOOLEAN == tmpDataType) {
+ if ((byte) 1 == rowBuffer.get()) {
+ tmpContent = true;
+ } else {
+ tmpContent = false;
+ }
+ } else if (DataTypes.SHORT == tmpDataType) {
+ tmpContent = rowBuffer.getShort();
+ } else if (DataTypes.INT == tmpDataType) {
+ tmpContent = rowBuffer.getInt();
+ } else if (DataTypes.LONG == tmpDataType) {
+ tmpContent = rowBuffer.getLong();
+ } else if (DataTypes.DOUBLE == tmpDataType) {
+ tmpContent = rowBuffer.getDouble();
+ } else if (DataTypes.isDecimal(tmpDataType)) {
+ short len = rowBuffer.getShort();
+ byte[] decimalBytes = new byte[len];
+ rowBuffer.get(decimalBytes);
+ tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+ } else {
+ throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+ }
+ measures[idx] = tmpContent;
+ }
}
/**
@@ -298,7 +383,7 @@ public class SortStepRowHandler implements Serializable {
* @param address address of the row
* @return intermediate sort temp row
*/
- public IntermediateSortTempRow readIntermediateSortTempRowFromUnsafeMemory(Object baseObject,
+ public IntermediateSortTempRow readFromMemoryWithoutNoSortFieldConvert(Object baseObject,
long address) {
int size = 0;
@@ -333,6 +418,51 @@ public class SortStepRowHandler implements Serializable {
}
/**
+ * Read intermediate sort temp row from unsafe memory.
+ * This method is used during merge sort phase for off-heap sort.
+ *
+ * @param baseObject base object of memory block
+ * @param address address of the row
+ * @return intermediate sort temp row
+ */
+ public IntermediateSortTempRow readRowFromMemoryWithNoSortFieldConvert(Object baseObject,
+ long address) {
+ int size = 0;
+
+ int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
+ byte[][] noDictSortDims =
+ new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+ + this.complexDimCnt][];
+
+ // read dict & sort dim
+ for (int idx = 0; idx < dictSortDimCnt; idx++) {
+ dictSortDims[idx] = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ }
+
+ // read no-dict & sort dim
+ for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
+ short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ size += 2;
+ byte[] bytes = new byte[length];
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
+ bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+ size += length;
+ noDictSortDims[idx] = bytes;
+ }
+
+ // read no-sort dims & measures
+ int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ byte[] noSortDimsAndMeasures = new byte[len];
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
+ noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
+ Object[] measures = new Object[measureCnt];
+ unpackNoSortFromBytes(noSortDimsAndMeasures, dictSortDims, noDictSortDims, measures);
+ return new IntermediateSortTempRow(dictSortDims, noDictSortDims, measures);
+ }
+
+ /**
* Write intermediate sort temp row directly from unsafe memory to stream.
* This method is used at the late beginning of the sort phase to write in-memory pages
* to sort temp file. Comparing with reading intermediate sort temp row from memory and then
@@ -429,6 +559,8 @@ public class SortStepRowHandler implements Serializable {
return size;
}
+
+
/**
* Pack to no-sort fields to byte array
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 7ea5cb3..45cfa13 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -49,6 +49,7 @@ public class UnsafeCarbonRowPage {
private TableFieldStat tableFieldStat;
private SortStepRowHandler sortStepRowHandler;
+ private boolean convertNoSortFields;
public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
boolean saveToDisk, long taskId) {
@@ -88,8 +89,13 @@ public class UnsafeCarbonRowPage {
* @return one row
*/
public IntermediateSortTempRow getRow(long address) {
- return sortStepRowHandler.readIntermediateSortTempRowFromUnsafeMemory(
- dataBlock.getBaseObject(), address);
+ if (convertNoSortFields) {
+ return sortStepRowHandler
+ .readRowFromMemoryWithNoSortFieldConvert(dataBlock.getBaseObject(), address);
+ } else {
+ return sortStepRowHandler
+ .readFromMemoryWithoutNoSortFieldConvert(dataBlock.getBaseObject(), address);
+ }
}
/**
@@ -146,4 +152,8 @@ public class UnsafeCarbonRowPage {
public enum MemoryManagerType {
UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER
}
+
+ public void setReadConvertedNoSortField() {
+ this.convertNoSortFields = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index c966ff2..102b057 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -49,6 +49,9 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
this.mergedAddresses = merger.getMergedAddresses();
this.rowPageIndexes = merger.getRowPageIndexes();
this.rowPages = merger.getUnsafeCarbonRowPages();
+ for (UnsafeCarbonRowPage rowPage: rowPages) {
+ rowPage.setReadConvertedNoSortField();
+ }
LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index 8b4b550..02ffd68 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -46,6 +46,7 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
this.comparator = new IntermediateSortTempRowComparator(
rowPage.getTableFieldStat().getIsSortColNoDictFlags());
+ this.rowPage.setReadConvertedNoSortField();
}
public boolean hasNext() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 86b7ac8..7c3c056 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -96,10 +96,12 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
private TableFieldStat tableFieldStat;
private SortStepRowHandler sortStepRowHandler;
private Comparator<IntermediateSortTempRow> comparator;
+ private boolean convertNoSortFields;
/**
* Constructor to initialize
*/
- public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
+ public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters,
+ boolean convertNoSortFields) {
// set temp file
this.tempFile = tempFile;
this.readBufferSize = parameters.getBufferSize();
@@ -108,6 +110,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
this.executorService = Executors.newFixedThreadPool(1);
comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn());
+ this.convertNoSortFields = convertNoSortFields;
initialize();
}
@@ -162,7 +165,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
fillDataForPrefetch();
} else {
try {
- this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+ if (convertNoSortFields) {
+ this.returnRow = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+ } else {
+ this.returnRow = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
+ }
this.numberOfObjectRead++;
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problems while reading row", e);
@@ -210,9 +217,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
throws IOException {
IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
for (int i = 0; i < expected; i++) {
- IntermediateSortTempRow holder
- = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
- holders[i] = holder;
+ if (convertNoSortFields) {
+ holders[i] = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+ } else {
+ holders[i] = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
+ }
}
this.numberOfObjectRead += expected;
return holders;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index c5b215e..0a12eda 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -209,7 +209,8 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
for (File tempFile : intermediateFiles) {
// create chunk holder
- sortTempFileChunkHolder = new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters);
+ sortTempFileChunkHolder =
+ new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters, false);
sortTempFileChunkHolder.readRow();
this.totalNumberOfRecords += sortTempFileChunkHolder.numberOfRows();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 6defeb7..2dd2f31 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -143,7 +143,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
for (final File file : filesToMergeSort) {
SortTempChunkHolder sortTempFileChunkHolder =
- new UnsafeSortTempFileChunkHolder(file, parameters);
+ new UnsafeSortTempFileChunkHolder(file, parameters, true);
// initialize
sortTempFileChunkHolder.readRow();
@@ -197,8 +197,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
*/
public Object[] next() {
if (hasNext()) {
- IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
- return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+ return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(getSortedRecordFromFile());
} else {
throw new NoSuchElementException("No more elements to return");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index 364515c..35563d0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -206,7 +206,8 @@ public class IntermediateFileMerger implements Callable<Void> {
for (File tempFile : intermediateFiles) {
// create chunk holder
sortTempFileChunkHolder =
- new SortTempFileChunkHolder(tempFile, mergerParameters, mergerParameters.getTableName());
+ new SortTempFileChunkHolder(tempFile, mergerParameters, mergerParameters.getTableName(),
+ false);
// initialize
sortTempFileChunkHolder.initialize();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index 09c1920..646969a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -176,7 +176,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
@Override public Void call() throws CarbonSortKeyAndGroupByException {
// create chunk holder
SortTempFileChunkHolder sortTempFileChunkHolder =
- new SortTempFileChunkHolder(tempFile, sortParameters, tableName);
+ new SortTempFileChunkHolder(tempFile, sortParameters, tableName, true);
try {
// initialize
sortTempFileChunkHolder.initialize();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 7e221a7..e39fe1d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -98,6 +98,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
private TableFieldStat tableFieldStat;
private SortStepRowHandler sortStepRowHandler;
private Comparator<IntermediateSortTempRow> comparator;
+ private boolean convertToActualField;
/**
* Constructor to initialize
*
@@ -105,7 +106,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
* @param sortParameters
* @param tableName
*/
- public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName) {
+ public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName,
+ boolean convertToActualField) {
// set temp file
this.tempFile = tempFile;
this.readBufferSize = sortParameters.getBufferSize();
@@ -116,6 +118,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
tableFieldStat.getIsSortColNoDictFlags());
this.executorService = Executors
.newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
+ this.convertToActualField = convertToActualField;
}
/**
@@ -168,7 +171,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
fillDataForPrefetch();
} else {
try {
- this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+ if (convertToActualField) {
+ this.returnRow = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+ } else {
+ this.returnRow = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
+ }
this.numberOfObjectRead++;
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while reading rows", e);
@@ -214,9 +221,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
private IntermediateSortTempRow[] readBatchedRowFromStream(int expected) throws IOException {
IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
for (int i = 0; i < expected; i++) {
- IntermediateSortTempRow holder
- = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
- holders[i] = holder;
+ if (convertToActualField) {
+ holders[i] = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+ } else {
+ holders[i] = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
+ }
}
this.numberOfObjectRead += expected;
return holders;