You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/02 08:01:55 UTC
[34/50] [abbrv] carbondata git commit: [CARBONDATA-2018][DataLoad]
Optimization in reading/writing for sort temp row
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 11b3d43..527452a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -31,15 +31,14 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
@@ -63,21 +62,15 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
* entry count
*/
private int entryCount;
-
/**
* return row
*/
- private Object[] returnRow;
- private int dimCnt;
- private int complexCnt;
- private int measureCnt;
- private boolean[] isNoDictionaryDimensionColumn;
- private DataType[] measureDataTypes;
+ private IntermediateSortTempRow returnRow;
private int readBufferSize;
private String compressorName;
- private Object[][] currentBuffer;
+ private IntermediateSortTempRow[] currentBuffer;
- private Object[][] backupBuffer;
+ private IntermediateSortTempRow[] backupBuffer;
private boolean isBackupFilled;
@@ -100,27 +93,21 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
private int numberOfObjectRead;
- private int nullSetWordsLength;
-
- private Comparator<Object[]> comparator;
-
+ private TableFieldStat tableFieldStat;
+ private SortStepRowHandler sortStepRowHandler;
+ private Comparator<IntermediateSortTempRow> comparator;
/**
* Constructor to initialize
*/
public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
// set temp file
this.tempFile = tempFile;
- this.dimCnt = parameters.getDimColCount();
- this.complexCnt = parameters.getComplexDimColCount();
- this.measureCnt = parameters.getMeasureColCount();
- this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
- this.measureDataTypes = parameters.getMeasureDataType();
this.readBufferSize = parameters.getBufferSize();
this.compressorName = parameters.getSortTempCompressorName();
-
+ this.tableFieldStat = new TableFieldStat(parameters);
+ this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
this.executorService = Executors.newFixedThreadPool(1);
- this.nullSetWordsLength = ((parameters.getMeasureColCount() - 1) >> 6) + 1;
- comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
+ comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn());
initialize();
}
@@ -169,11 +156,17 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
*
* @throws CarbonSortKeyAndGroupByException problem while reading
*/
+ @Override
public void readRow() throws CarbonSortKeyAndGroupByException {
if (prefetch) {
fillDataForPrefetch();
} else {
- this.returnRow = getRowFromStream();
+ try {
+ this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+ this.numberOfObjectRead++;
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problems while reading row", e);
+ }
}
}
@@ -207,63 +200,22 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
}
/**
- * @return
- * @throws CarbonSortKeyAndGroupByException
+ * get a batch of row, this interface is used in reading compressed sort temp files
+ *
+ * @param expected expected number in a batch
+ * @return a batch of row
+ * @throws IOException if error occurs while reading from stream
*/
- private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
- Object[] row = new Object[dimCnt + measureCnt];
- try {
- int dimCount = 0;
- for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
- if (isNoDictionaryDimensionColumn[dimCount]) {
- short aShort = stream.readShort();
- byte[] col = new byte[aShort];
- stream.readFully(col);
- row[dimCount] = col;
- } else {
- int anInt = stream.readInt();
- row[dimCount] = anInt;
- }
- }
-
- // write complex dimensions here.
- for (; dimCount < dimCnt; dimCount++) {
- short aShort = stream.readShort();
- byte[] col = new byte[aShort];
- stream.readFully(col);
- row[dimCount] = col;
- }
-
- long[] words = new long[nullSetWordsLength];
- for (int i = 0; i < words.length; i++) {
- words[i] = stream.readLong();
- }
-
- for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
- if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
- DataType dataType = measureDataTypes[mesCount];
- if (dataType == DataTypes.SHORT) {
- row[dimCount + mesCount] = stream.readShort();
- } else if (dataType == DataTypes.INT) {
- row[dimCount + mesCount] = stream.readInt();
- } else if (dataType == DataTypes.LONG) {
- row[dimCount + mesCount] = stream.readLong();
- } else if (dataType == DataTypes.DOUBLE) {
- row[dimCount + mesCount] = stream.readDouble();
- } else if (DataTypes.isDecimal(dataType)) {
- short aShort = stream.readShort();
- byte[] bigDecimalInBytes = new byte[aShort];
- stream.readFully(bigDecimalInBytes);
- row[dimCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
- } else {
- throw new IllegalArgumentException("unsupported data type:" + dataType);
- }
- }
- }
- return row;
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException(e);
+ private IntermediateSortTempRow[] readBatchedRowFromStream(int expected)
+ throws IOException {
+ IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
+ for (int i = 0; i < expected; i++) {
+ IntermediateSortTempRow holder
+ = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+ holders[i] = holder;
}
+ this.numberOfObjectRead += expected;
+ return holders;
}
/**
@@ -271,7 +223,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
*
* @return row
*/
- public Object[] getRow() {
+ public IntermediateSortTempRow getRow() {
return this.returnRow;
}
@@ -326,9 +278,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
@Override public int hashCode() {
int hash = 0;
- hash += 31 * measureCnt;
- hash += 31 * dimCnt;
- hash += 31 * complexCnt;
+ hash += tableFieldStat.hashCode();
hash += tempFile.hashCode();
return hash;
}
@@ -368,16 +318,12 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
/**
* This method will read the records from sort temp file and keep it in a buffer
*
- * @param numberOfRecords
- * @return
- * @throws CarbonSortKeyAndGroupByException
+ * @param numberOfRecords number of records to be read
+ * @return batch of intermediate sort temp row
+ * @throws IOException if error occurs reading records from file
*/
- private Object[][] prefetchRecordsFromFile(int numberOfRecords)
- throws CarbonSortKeyAndGroupByException {
- Object[][] records = new Object[numberOfRecords][];
- for (int i = 0; i < numberOfRecords; i++) {
- records[i] = getRowFromStream();
- }
- return records;
+ private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords)
+ throws IOException {
+ return readBatchedRowFromStream(numberOfRecords);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 4bbf61b..22673ff 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -21,25 +21,21 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
import java.util.AbstractQueue;
-import java.util.Arrays;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
public class UnsafeIntermediateFileMerger implements Callable<Void> {
/**
@@ -69,22 +65,13 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
private int totalNumberOfRecords;
private SortParameters mergerParameters;
-
+ private TableFieldStat tableFieldStat;
private File[] intermediateFiles;
-
private File outPutFile;
- private int dimCnt;
- private int complexCnt;
- private int measureCnt;
- private boolean[] isNoDictionaryDimensionColumn;
- private DataType[] measureDataTypes;
private int writeBufferSize;
private String compressorName;
-
- private long[] nullSetWords;
-
- private ByteBuffer rowData;
+ private SortStepRowHandler sortStepRowHandler;
private Throwable throwable;
@@ -97,16 +84,10 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
this.fileCounter = intermediateFiles.length;
this.intermediateFiles = intermediateFiles;
this.outPutFile = outPutFile;
- this.dimCnt = mergerParameters.getDimColCount();
- this.complexCnt = mergerParameters.getComplexDimColCount();
- this.measureCnt = mergerParameters.getMeasureColCount();
- this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
- this.measureDataTypes = mergerParameters.getMeasureDataType();
this.writeBufferSize = mergerParameters.getBufferSize();
this.compressorName = mergerParameters.getSortTempCompressorName();
- this.nullSetWords = new long[((measureCnt - 1) >> 6) + 1];
- // Take size of 2 MB for each row. I think it is high enough to use
- rowData = ByteBuffer.allocate(2 * 1024 * 1024);
+ this.tableFieldStat = new TableFieldStat(mergerParameters);
+ this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
}
@Override public Void call() throws Exception {
@@ -165,13 +146,14 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
}
/**
- * This method will be used to get the sorted record from file
+ * This method will be used to get sorted sort temp row from the sort temp files
*
* @return sorted record sorted record
* @throws CarbonSortKeyAndGroupByException
*/
- private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
- Object[] row = null;
+ private IntermediateSortTempRow getSortedRecordFromFile()
+ throws CarbonSortKeyAndGroupByException {
+ IntermediateSortTempRow row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
@@ -235,7 +217,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
this.recordHolderHeap.add(sortTempFileChunkHolder);
}
- LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+ LOGGER.info("Heap Size: " + this.recordHolderHeap.size());
}
/**
@@ -250,12 +232,12 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
}
/**
- * This method will be used to get the sorted row
+ * This method will be used to get the sorted sort temp row
*
* @return sorted row
* @throws CarbonSortKeyAndGroupByException
*/
- private Object[] next() throws CarbonSortKeyAndGroupByException {
+ private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
return getSortedRecordFromFile();
}
@@ -272,82 +254,16 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
/**
* Below method will be used to write data to file
*
- * @throws CarbonSortKeyAndGroupByException problem while writing
+ * @throws IOException problem while writing
*/
- private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
- int dimCount = 0;
- int size = 0;
- for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
- if (isNoDictionaryDimensionColumn[dimCount]) {
- byte[] col = (byte[]) row[dimCount];
- rowData.putShort((short) col.length);
- size += 2;
- rowData.put(col);
- size += col.length;
- } else {
- rowData.putInt((int) row[dimCount]);
- size += 4;
- }
- }
-
- // write complex dimensions here.
- int dimensionSize = dimCnt + complexCnt;
- for (; dimCount < dimensionSize; dimCount++) {
- byte[] col = (byte[]) row[dimCount];
- rowData.putShort((short)col.length);
- size += 2;
- rowData.put(col);
- size += col.length;
- }
- Arrays.fill(nullSetWords, 0);
- int nullSetSize = nullSetWords.length * 8;
- int nullLoc = size;
- size += nullSetSize;
- for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
- Object value = row[mesCount + dimensionSize];
- if (null != value) {
- DataType dataType = measureDataTypes[mesCount];
- if (dataType == DataTypes.SHORT) {
- rowData.putShort(size, (Short) value);
- size += 2;
- } else if (dataType == DataTypes.INT) {
- rowData.putInt(size, (Integer) value);
- size += 4;
- } else if (dataType == DataTypes.LONG) {
- rowData.putLong(size, (Long) value);
- size += 8;
- } else if (dataType == DataTypes.DOUBLE) {
- rowData.putDouble(size, (Double) value);
- size += 8;
- } else if (DataTypes.isDecimal(dataType)) {
- byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(((BigDecimal) value));
- rowData.putShort(size, (short) bigDecimalInBytes.length);
- size += 2;
- for (int i = 0; i < bigDecimalInBytes.length; i++) {
- rowData.put(size++, bigDecimalInBytes[i]);
- }
- }
- UnsafeCarbonRowPage.set(nullSetWords, mesCount);
- } else {
- UnsafeCarbonRowPage.unset(nullSetWords, mesCount);
- }
- }
- for (int i = 0; i < nullSetWords.length; i++) {
- rowData.putLong(nullLoc, nullSetWords[i]);
- nullLoc += 8;
- }
- byte[] rowBytes = new byte[size];
- rowData.position(0);
- rowData.get(rowBytes);
- stream.write(rowBytes);
- rowData.clear();
+ private void writeDataToFile(IntermediateSortTempRow row) throws IOException {
+ sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream);
}
private void finish() throws CarbonSortKeyAndGroupByException {
clear();
try {
CarbonUtil.deleteFiles(intermediateFiles);
- rowData.clear();
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index ce118d9..64f3c25 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -29,7 +29,8 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.loading.sort.SortStepRowUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeFinalMergePageHolder;
@@ -55,7 +56,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;
private SortParameters parameters;
- private SortStepRowUtil sortStepRowUtil;
+ private SortStepRowHandler sortStepRowHandler;
/**
* tempFileLocation
*/
@@ -68,7 +69,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
String[] tempFileLocation) {
this.parameters = parameters;
- this.sortStepRowUtil = new SortStepRowUtil(parameters);
+ this.sortStepRowHandler = new SortStepRowHandler(parameters);
this.tempFileLocation = tempFileLocation;
this.tableName = parameters.getTableName();
}
@@ -108,9 +109,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
LOGGER.info("Started adding first record from each page");
for (final UnsafeCarbonRowPage rowPage : rowPages) {
- SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
- parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
- .getMeasureColCount(), parameters.getNumberOfSortColumns());
+ SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage);
// initialize
sortTempFileChunkHolder.readRow();
@@ -121,9 +120,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
SortTempChunkHolder sortTempFileChunkHolder =
- new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(),
- parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
- .getMeasureColCount());
+ new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn());
// initialize
sortTempFileChunkHolder.readRow();
@@ -142,7 +139,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
recordHolderHeapLocal.add(sortTempFileChunkHolder);
}
- LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
+ LOGGER.info("Heap Size: " + this.recordHolderHeapLocal.size());
} catch (Exception e) {
LOGGER.error(e);
throw new CarbonDataWriterException(e);
@@ -180,12 +177,14 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
}
/**
- * This method will be used to get the sorted row
+ * This method will be used to get the sorted row in 3-parted format.
+ * The row will feed the following writer process step.
*
* @return sorted row
*/
public Object[] next() {
- return sortStepRowUtil.convertRow(getSortedRecordFromFile());
+ IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
+ return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
}
/**
@@ -193,8 +192,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
*
* @return sorted record sorted record
*/
- private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
- Object[] row = null;
+ private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException {
+ IntermediateSortTempRow row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index bfe38fd..b71612a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -389,7 +389,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length);
}
sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
-
String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
finalMerger =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index 04efa1f..c06819c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -21,7 +21,6 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.math.BigDecimal;
import java.util.AbstractQueue;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
@@ -29,11 +28,9 @@ import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
public class IntermediateFileMerger implements Callable<Void> {
@@ -68,17 +65,12 @@ public class IntermediateFileMerger implements Callable<Void> {
private File[] intermediateFiles;
private File outPutFile;
- private int dimCnt;
- private int noDictDimCnt;
- private int complexCnt;
- private int measureCnt;
- private boolean[] isNoDictionaryDimensionColumn;
- private DataType[] measureDataTypes;
private int writeBufferSize;
private String compressorName;
private Throwable throwable;
-
+ private TableFieldStat tableFieldStat;
+ private SortStepRowHandler sortStepRowHandler;
/**
* IntermediateFileMerger Constructor
*/
@@ -88,14 +80,10 @@ public class IntermediateFileMerger implements Callable<Void> {
this.fileCounter = intermediateFiles.length;
this.intermediateFiles = intermediateFiles;
this.outPutFile = outPutFile;
- this.dimCnt = mergerParameters.getDimColCount();
- this.noDictDimCnt = mergerParameters.getNoDictionaryCount();
- this.complexCnt = mergerParameters.getComplexDimColCount();
- this.measureCnt = mergerParameters.getMeasureColCount();
- this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
- this.measureDataTypes = mergerParameters.getMeasureDataType();
this.writeBufferSize = mergerParameters.getBufferSize();
this.compressorName = mergerParameters.getSortTempCompressorName();
+ this.tableFieldStat = new TableFieldStat(mergerParameters);
+ this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
}
@Override public Void call() throws Exception {
@@ -154,13 +142,14 @@ public class IntermediateFileMerger implements Callable<Void> {
}
/**
- * This method will be used to get the sorted record from file
+ * This method will be used to get the sorted sort temp row from sort temp file
*
* @return sorted record sorted record
* @throws CarbonSortKeyAndGroupByException
*/
- private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
- Object[] row = null;
+ private IntermediateSortTempRow getSortedRecordFromFile()
+ throws CarbonSortKeyAndGroupByException {
+ IntermediateSortTempRow row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
@@ -227,7 +216,7 @@ public class IntermediateFileMerger implements Callable<Void> {
this.recordHolderHeap.add(sortTempFileChunkHolder);
}
- LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+ LOGGER.info("Heap Size: " + this.recordHolderHeap.size());
}
/**
@@ -242,12 +231,12 @@ public class IntermediateFileMerger implements Callable<Void> {
}
/**
- * This method will be used to get the sorted row
+ * This method will be used to get the sorted sort temp row
*
* @return sorted row
* @throws CarbonSortKeyAndGroupByException
*/
- private Object[] next() throws CarbonSortKeyAndGroupByException {
+ private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
return getSortedRecordFromFile();
}
@@ -264,62 +253,10 @@ public class IntermediateFileMerger implements Callable<Void> {
/**
* Below method will be used to write data to file
*
- * @throws CarbonSortKeyAndGroupByException problem while writing
+ * @throws IOException problem while writing
*/
- private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException {
- try {
- int[] mdkArray = (int[]) row[0];
- byte[][] nonDictArray = (byte[][]) row[1];
- int mdkIndex = 0;
- int nonDictKeyIndex = 0;
- // write dictionary and non dictionary dimensions here.
- for (boolean nodictinary : isNoDictionaryDimensionColumn) {
- if (nodictinary) {
- byte[] col = nonDictArray[nonDictKeyIndex++];
- stream.writeShort(col.length);
- stream.write(col);
- } else {
- stream.writeInt(mdkArray[mdkIndex++]);
- }
- }
- // write complex
- for (; nonDictKeyIndex < noDictDimCnt + complexCnt; nonDictKeyIndex++) {
- byte[] col = nonDictArray[nonDictKeyIndex++];
- stream.writeShort(col.length);
- stream.write(col);
- }
- // write measure
- int fieldIndex = 0;
- for (int counter = 0; counter < measureCnt; counter++) {
- if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
- stream.write((byte) 1);
- DataType dataType = measureDataTypes[counter];
- if (dataType == DataTypes.BOOLEAN) {
- stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row));
- } else if (dataType == DataTypes.SHORT) {
- stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row));
- } else if (dataType == DataTypes.INT) {
- stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row));
- } else if (dataType == DataTypes.LONG) {
- stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row));
- } else if (dataType == DataTypes.DOUBLE) {
- stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
- } else if (DataTypes.isDecimal(dataType)) {
- byte[] bigDecimalInBytes = DataTypeUtil
- .bigDecimalToByte((BigDecimal) NonDictionaryUtil.getMeasure(fieldIndex, row));
- stream.writeInt(bigDecimalInBytes.length);
- stream.write(bigDecimalInBytes);
- } else {
- throw new IllegalArgumentException("unsupported data type:" + dataType);
- }
- } else {
- stream.write((byte) 0);
- }
- fieldIndex++;
- }
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
- }
+ private void writeDataToFile(IntermediateSortTempRow row) throws IOException {
+ sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream);
}
private void finish() throws CarbonSortKeyAndGroupByException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
new file mode 100644
index 0000000..9b6d1e8
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+
+/**
+ * This class is used as comparator for comparing intermediate sort temp row
+ */
+public class IntermediateSortTempRowComparator implements Comparator<IntermediateSortTempRow> {
+ /**
+ * isSortColumnNoDictionary whether the sort column is not dictionary or not
+ */
+ private boolean[] isSortColumnNoDictionary;
+
+ /**
+ * @param isSortColumnNoDictionary isSortColumnNoDictionary
+ */
+ public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) {
+ this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+ }
+
+ /**
+ * Below method will be used to compare two sort temp row
+ */
+ public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow rowB) {
+ int diff = 0;
+ int dictIndex = 0;
+ int nonDictIndex = 0;
+
+ for (boolean isNoDictionary : isSortColumnNoDictionary) {
+
+ if (isNoDictionary) {
+ byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex];
+ byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex];
+ nonDictIndex++;
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ int dimFieldA = rowA.getDictSortDims()[dictIndex];
+ int dimFieldB = rowB.getDictSortDims()[dictIndex];
+ dictIndex++;
+
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ }
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
index d2579d2..3f94533 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -40,14 +40,11 @@ public class NewRowComparator implements Comparator<Object[]> {
*/
public int compare(Object[] rowA, Object[] rowB) {
int diff = 0;
-
int index = 0;
for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-
if (isNoDictionary) {
byte[] byteArr1 = (byte[]) rowA[index];
-
byte[] byteArr2 = (byte[]) rowB[index];
int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
@@ -57,6 +54,7 @@ public class NewRowComparator implements Comparator<Object[]> {
} else {
int dimFieldA = (int) rowA[index];
int dimFieldB = (int) rowB[index];
+
diff = dimFieldA - dimFieldB;
if (diff != 0) {
return diff;
@@ -65,7 +63,6 @@ public class NewRowComparator implements Comparator<Object[]> {
index++;
}
-
return diff;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
index e01b587..7538c92 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
@@ -29,7 +29,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
private int numberOfSortColumns;
/**
- * RowComparatorForNormalDims Constructor
+ * NewRowComparatorForNormalDims Constructor
*
* @param numberOfSortColumns
*/
@@ -46,7 +46,6 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
int diff = 0;
for (int i = 0; i < numberOfSortColumns; i++) {
-
int dimFieldA = (int)rowA[i];
int dimFieldB = (int)rowB[i];
diff = dimFieldA - dimFieldB;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
deleted file mode 100644
index 0ae0b93..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-
-public class RowComparator implements Comparator<Object[]> {
- /**
- * noDictionaryCount represent number of no dictionary cols
- */
- private int noDictionaryCount;
-
- /**
- * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
- */
- private boolean[] noDictionarySortColumnMaping;
-
- /**
- * @param noDictionarySortColumnMaping
- * @param noDictionaryCount
- */
- public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) {
- this.noDictionaryCount = noDictionaryCount;
- this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
- }
-
- /**
- * Below method will be used to compare two mdkey
- */
- public int compare(Object[] rowA, Object[] rowB) {
- int diff = 0;
-
- int normalIndex = 0;
- int noDictionaryindex = 0;
-
- for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-
- if (isNoDictionary) {
- byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-
- ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
-
- // extract a high card dims from complete byte[].
- NonDictionaryUtil
- .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1);
-
- byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-
- ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);
-
- // extract a high card dims from complete byte[].
- NonDictionaryUtil
- .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2);
-
- int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2);
- if (difference != 0) {
- return difference;
- }
- noDictionaryindex++;
- } else {
- int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA);
- int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB);
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- normalIndex++;
- }
-
- }
-
- return diff;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
deleted file mode 100644
index 0883ae1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-
-/**
- * This class is used as comparator for comparing dims which are non high cardinality dims.
- * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
- */
-public class RowComparatorForNormalDims implements Comparator<Object[]> {
- /**
- * dimension count
- */
- private int numberOfSortColumns;
-
- /**
- * RowComparatorForNormalDims Constructor
- *
- * @param numberOfSortColumns
- */
- public RowComparatorForNormalDims(int numberOfSortColumns) {
- this.numberOfSortColumns = numberOfSortColumns;
- }
-
- /**
- * Below method will be used to compare two surrogate keys
- *
- * @see Comparator#compare(Object, Object)
- */
- public int compare(Object[] rowA, Object[] rowB) {
- int diff = 0;
-
- for (int i = 0; i < numberOfSortColumns; i++) {
-
- int dimFieldA = NonDictionaryUtil.getDimension(i, rowA);
- int dimFieldB = NonDictionaryUtil.getDimension(i, rowB);
-
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
- return diff;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index 88695b9..a4ac0ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -37,6 +37,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -71,12 +73,12 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
* tableName
*/
private String tableName;
-
+ private SortParameters sortParameters;
+ private SortStepRowHandler sortStepRowHandler;
/**
* tempFileLocation
*/
private String[] tempFileLocation;
- private SortParameters sortParameters;
private int maxThreadForSorting;
@@ -89,6 +91,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
this.tempFileLocation = tempFileLocation;
this.tableName = tableName;
this.sortParameters = sortParameters;
+ this.sortStepRowHandler = new SortStepRowHandler(sortParameters);
try {
maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
@@ -107,8 +110,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
*/
public void startFinalMerge() throws CarbonDataWriterException {
List<File> filesToMerge = getFilesToMergeSort();
- if (filesToMerge.size() == 0)
- {
+ if (filesToMerge.size() == 0) {
LOGGER.info("No file to merge in final merge stage");
return;
}
@@ -125,11 +127,9 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
// get all the merged files
List<File> files = new ArrayList<File>(tempFileLocation.length);
- for (String tempLoc : tempFileLocation)
- {
+ for (String tempLoc : tempFileLocation) {
File[] subFiles = new File(tempLoc).listFiles(fileFilter);
- if (null != subFiles && subFiles.length > 0)
- {
+ if (null != subFiles && subFiles.length > 0) {
files.addAll(Arrays.asList(subFiles));
}
}
@@ -226,13 +226,14 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
}
/**
- * This method will be used to get the sorted row
+ * This method will be used to get the sorted sort temp row from the sort temp files
*
* @return sorted row
* @throws CarbonSortKeyAndGroupByException
*/
public Object[] next() {
- return getSortedRecordFromFile();
+ IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
+ return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
}
/**
@@ -241,8 +242,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
* @return sorted record sorted record
* @throws CarbonSortKeyAndGroupByException
*/
- private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
- Object[] row = null;
+ private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException {
+ IntermediateSortTempRow row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 57a19bd..c7efbd9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -20,7 +20,7 @@ package org.apache.carbondata.processing.sort.sortdata;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
@@ -32,12 +32,10 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -69,7 +67,8 @@ public class SortDataRows {
private Semaphore semaphore;
private SortParameters parameters;
-
+ private SortStepRowHandler sortStepRowHandler;
+ private ThreadLocal<ByteBuffer> rowBuffer;
private int sortBufferSize;
private SortIntermediateFileMerger intermediateFileMerger;
@@ -79,7 +78,7 @@ public class SortDataRows {
public SortDataRows(SortParameters parameters,
SortIntermediateFileMerger intermediateFileMerger) {
this.parameters = parameters;
-
+ this.sortStepRowHandler = new SortStepRowHandler(parameters);
this.intermediateFileMerger = intermediateFileMerger;
int batchSize = CarbonProperties.getInstance().getBatchSize();
@@ -87,6 +86,12 @@ public class SortDataRows {
this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize);
// observer of writing file in thread
this.threadStatusObserver = new ThreadStatusObserver();
+ this.rowBuffer = new ThreadLocal<ByteBuffer>() {
+ @Override protected ByteBuffer initialValue() {
+ byte[] backedArray = new byte[2 * 1024 * 1024];
+ return ByteBuffer.wrap(backedArray);
+ }
+ };
}
/**
@@ -130,8 +135,7 @@ public class SortDataRows {
semaphore.acquire();
dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
} catch (InterruptedException e) {
- LOGGER.error(e,
- "exception occurred while trying to acquire a semaphore lock: ");
+ LOGGER.error(e, "exception occurred while trying to acquire a semaphore lock: ");
throw new CarbonSortKeyAndGroupByException(e);
}
// create the new holder Array
@@ -158,7 +162,7 @@ public class SortDataRows {
}
intermediateFileMerger.startMergingIfPossible();
Object[][] recordHolderListLocal = recordHolderList;
- sizeLeft = sortBufferSize - entryCount ;
+ sizeLeft = sortBufferSize - entryCount;
if (sizeLeft > 0) {
System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
}
@@ -212,7 +216,6 @@ public class SortDataRows {
locationChosen + File.separator + parameters.getTableName() +
System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
writeDataToFile(recordHolderList, this.entryCount, file);
-
}
startFileBasedMerge();
@@ -220,7 +223,7 @@ public class SortDataRows {
}
/**
- * Below method will be used to write data to file
+ * Below method will be used to write data to sort temp file
*
* @throws CarbonSortKeyAndGroupByException problem while writing
*/
@@ -233,60 +236,9 @@ public class SortDataRows {
parameters.getFileWriteBufferSize(), parameters.getSortTempCompressorName());
// write number of entries to the file
stream.writeInt(entryCountLocal);
- int complexDimColCount = parameters.getComplexDimColCount();
- int dimColCount = parameters.getDimColCount() + complexDimColCount;
- DataType[] type = parameters.getMeasureDataType();
- boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
- Object[] row = null;
for (int i = 0; i < entryCountLocal; i++) {
- // get row from record holder list
- row = recordHolderList[i];
- int dimCount = 0;
- // write dictionary and non dictionary dimensions here.
- for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) {
- if (noDictionaryDimnesionMapping[dimCount]) {
- byte[] col = (byte[]) row[dimCount];
- stream.writeShort(col.length);
- stream.write(col);
- } else {
- stream.writeInt((int)row[dimCount]);
- }
- }
- // write complex dimensions here.
- for (; dimCount < dimColCount; dimCount++) {
- byte[] value = (byte[])row[dimCount];
- stream.writeShort(value.length);
- stream.write(value);
- }
- // as measures are stored in separate array.
- for (int mesCount = 0;
- mesCount < parameters.getMeasureColCount(); mesCount++) {
- Object value = row[mesCount + dimColCount];
- if (null != value) {
- stream.write((byte) 1);
- DataType dataType = type[mesCount];
- if (dataType == DataTypes.BOOLEAN) {
- stream.writeBoolean((boolean) value);
- } else if (dataType == DataTypes.SHORT) {
- stream.writeShort((Short) value);
- } else if (dataType == DataTypes.INT) {
- stream.writeInt((Integer) value);
- } else if (dataType == DataTypes.LONG) {
- stream.writeLong((Long) value);
- } else if (dataType == DataTypes.DOUBLE) {
- stream.writeDouble((Double) value);
- } else if (DataTypes.isDecimal(dataType)) {
- BigDecimal val = (BigDecimal) value;
- byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
- stream.writeInt(bigDecimalInBytes.length);
- stream.write(bigDecimalInBytes);
- } else {
- throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
- }
- } else {
- stream.write((byte) 0);
- }
- }
+ sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToOutputStream(
+ recordHolderList[i], stream, rowBuffer.get());
}
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
@@ -301,7 +253,7 @@ public class SortDataRows {
*
* @throws CarbonSortKeyAndGroupByException
*/
- public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
+ private void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
}
@@ -380,7 +332,8 @@ public class SortDataRows {
// intermediate merging of sort temp files will be triggered
intermediateFileMerger.addFileToMerge(sortTempFile);
LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + (
- System.currentTimeMillis() - startTime));
+ System.currentTimeMillis() - startTime) + ", sort temp file size in MB is "
+ + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
} catch (Throwable e) {
try {
threadStatusObserver.notifyFailed(e);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index d726539..7e221a7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -30,14 +31,11 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> {
@@ -71,20 +69,13 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
/**
* return row
*/
- private Object[] returnRow;
- private int dimCnt;
- private int noDictDimCnt;
- private int complexCnt;
- private int measureCnt;
- private boolean[] isNoDictionaryDimensionColumn;
- private boolean[] isNoDictionarySortColumn;
- private DataType[] measureDataTypes;
+ private IntermediateSortTempRow returnRow;
private int readBufferSize;
private String compressorName;
- private Object[][] currentBuffer;
+ private IntermediateSortTempRow[] currentBuffer;
- private Object[][] backupBuffer;
+ private IntermediateSortTempRow[] backupBuffer;
private boolean isBackupFilled;
@@ -104,7 +95,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
* totalRecordFetch
*/
private int totalRecordFetch;
-
+ private TableFieldStat tableFieldStat;
+ private SortStepRowHandler sortStepRowHandler;
+ private Comparator<IntermediateSortTempRow> comparator;
/**
* Constructor to initialize
*
@@ -115,16 +108,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName) {
// set temp file
this.tempFile = tempFile;
- this.dimCnt = sortParameters.getDimColCount();
- this.noDictDimCnt = sortParameters.getNoDictionaryCount();
- this.complexCnt = sortParameters.getComplexDimColCount();
- this.measureCnt = sortParameters.getMeasureColCount();
- this.isNoDictionaryDimensionColumn = sortParameters.getNoDictionaryDimnesionColumn();
- this.isNoDictionarySortColumn = sortParameters.getNoDictionarySortColumn();
- this.measureDataTypes = sortParameters.getMeasureDataType();
this.readBufferSize = sortParameters.getBufferSize();
this.compressorName = sortParameters.getSortTempCompressorName();
-
+ this.tableFieldStat = new TableFieldStat(sortParameters);
+ this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+ this.comparator = new IntermediateSortTempRowComparator(
+ tableFieldStat.getIsSortColNoDictFlags());
this.executorService = Executors
.newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
}
@@ -178,7 +167,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
if (prefetch) {
fillDataForPrefetch();
} else {
- this.returnRow = getRowFromStream();
+ try {
+ this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+ this.numberOfObjectRead++;
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while reading rows", e);
+ }
}
}
@@ -212,86 +206,28 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
}
/**
- * Reads row from file
+ * Read a batch of row from stream
+ *
* @return Object[]
- * @throws CarbonSortKeyAndGroupByException
+ * @throws IOException if error occurs while reading from stream
*/
- private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
- // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
-
- Object[] holder = new Object[3];
- int index = 0;
- int nonDicIndex = 0;
- int[] dim = new int[dimCnt - noDictDimCnt];
- byte[][] nonDicArray = new byte[noDictDimCnt + complexCnt][];
- Object[] measures = new Object[measureCnt];
- try {
- // read dimension values
- for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
- if (isNoDictionaryDimensionColumn[i]) {
- short len = stream.readShort();
- byte[] array = new byte[len];
- stream.readFully(array);
- nonDicArray[nonDicIndex++] = array;
- } else {
- dim[index++] = stream.readInt();
- }
- }
-
- for (int i = 0; i < complexCnt; i++) {
- short len = stream.readShort();
- byte[] array = new byte[len];
- stream.readFully(array);
- nonDicArray[nonDicIndex++] = array;
- }
-
- index = 0;
- // read measure values
- for (int i = 0; i < measureCnt; i++) {
- if (stream.readByte() == 1) {
- DataType dataType = measureDataTypes[i];
- if (dataType == DataTypes.BOOLEAN) {
- measures[index++] = stream.readBoolean();
- } else if (dataType == DataTypes.SHORT) {
- measures[index++] = stream.readShort();
- } else if (dataType == DataTypes.INT) {
- measures[index++] = stream.readInt();
- } else if (dataType == DataTypes.LONG) {
- measures[index++] = stream.readLong();
- } else if (dataType == DataTypes.DOUBLE) {
- measures[index++] = stream.readDouble();
- } else if (DataTypes.isDecimal(dataType)) {
- int len = stream.readInt();
- byte[] buff = new byte[len];
- stream.readFully(buff);
- measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
- } else {
- throw new IllegalArgumentException("unsupported data type:" + dataType);
- }
- } else {
- measures[index++] = null;
- }
- }
-
- NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
-
- // increment number if record read
- this.numberOfObjectRead++;
- } catch (IOException e) {
- LOGGER.error("Problme while reading the madkey fom sort temp file");
- throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e);
+ private IntermediateSortTempRow[] readBatchedRowFromStream(int expected) throws IOException {
+ IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
+ for (int i = 0; i < expected; i++) {
+ IntermediateSortTempRow holder
+ = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+ holders[i] = holder;
}
-
- //return out row
- return holder;
+ this.numberOfObjectRead += expected;
+ return holders;
}
/**
- * below method will be used to get the row
+ * below method will be used to get the sort temp row
*
* @return row
*/
- public Object[] getRow() {
+ public IntermediateSortTempRow getRow() {
return this.returnRow;
}
@@ -330,31 +266,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
}
@Override public int compareTo(SortTempFileChunkHolder other) {
- int diff = 0;
- int index = 0;
- int noDictionaryIndex = 0;
- int[] leftMdkArray = (int[]) returnRow[0];
- int[] rightMdkArray = (int[]) other.returnRow[0];
- byte[][] leftNonDictArray = (byte[][]) returnRow[1];
- byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
- for (boolean isNoDictionary : isNoDictionarySortColumn) {
- if (isNoDictionary) {
- diff = UnsafeComparer.INSTANCE
- .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);
- if (diff != 0) {
- return diff;
- }
- noDictionaryIndex++;
- } else {
- diff = leftMdkArray[index] - rightMdkArray[index];
- if (diff != 0) {
- return diff;
- }
- index++;
- }
-
- }
- return diff;
+ return comparator.compare(returnRow, other.getRow());
}
@Override public boolean equals(Object obj) {
@@ -372,9 +284,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
@Override public int hashCode() {
int hash = 0;
- hash += 31 * measureCnt;
- hash += 31 * dimCnt;
- hash += 31 * complexCnt;
+ hash += tableFieldStat.hashCode();
hash += tempFile.hashCode();
return hash;
}
@@ -414,16 +324,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
/**
* This method will read the records from sort temp file and keep it in a buffer
*
- * @param numberOfRecords
- * @return
- * @throws CarbonSortKeyAndGroupByException
+ * @param numberOfRecords number of records to be read
+ * @return batch of intermediate sort temp row
+ * @throws IOException if error occurs while reading reading records
*/
- private Object[][] prefetchRecordsFromFile(int numberOfRecords)
- throws CarbonSortKeyAndGroupByException {
- Object[][] records = new Object[numberOfRecords][];
- for (int i = 0; i < numberOfRecords; i++) {
- records[i] = getRowFromStream();
- }
- return records;
+ private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords)
+ throws IOException {
+ return readBatchedRowFromStream(numberOfRecords);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d115c479/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
new file mode 100644
index 0000000..0d1303a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * This class is used to hold field information for a table during data loading. These information
+ * will be used to convert/construct/destruct row in sort process step. Because complex field is
+ * processed the same as no-dict-no-sort-simple-dimension, so we treat them as the same and use
+ * `no-dict-no-sort-dim` related variable to represent them in this class.
+ */
+public class TableFieldStat implements Serializable {
+ private static final long serialVersionUID = 201712070950L;
+ private int dictSortDimCnt = 0;
+ private int dictNoSortDimCnt = 0;
+ private int noDictSortDimCnt = 0;
+ private int noDictNoSortDimCnt = 0;
+ // whether sort column is of dictionary type or not
+ private boolean[] isSortColNoDictFlags;
+ private int measureCnt;
+ private DataType[] measureDataType;
+
+ // indices for dict & sort dimension columns
+ private int[] dictSortDimIdx;
+ // indices for dict & no-sort dimension columns
+ private int[] dictNoSortDimIdx;
+ // indices for no-dict & sort dimension columns
+ private int[] noDictSortDimIdx;
+ // indices for no-dict & no-sort dimension columns, including complex columns
+ private int[] noDictNoSortDimIdx;
+ // indices for measure columns
+ private int[] measureIdx;
+
+ public TableFieldStat(SortParameters sortParameters) {
+ int noDictDimCnt = sortParameters.getNoDictionaryCount();
+ int complexDimCnt = sortParameters.getComplexDimColCount();
+ int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
+ this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
+ int sortColCnt = isSortColNoDictFlags.length;
+ for (boolean flag : isSortColNoDictFlags) {
+ if (flag) {
+ noDictSortDimCnt++;
+ } else {
+ dictSortDimCnt++;
+ }
+ }
+ this.measureCnt = sortParameters.getMeasureColCount();
+ this.measureDataType = sortParameters.getMeasureDataType();
+
+ // be careful that the default value is 0
+ this.dictSortDimIdx = new int[dictSortDimCnt];
+ this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt];
+ this.noDictSortDimIdx = new int[noDictSortDimCnt];
+ this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt];
+ this.measureIdx = new int[measureCnt];
+
+ int tmpNoDictSortCnt = 0;
+ int tmpNoDictNoSortCnt = 0;
+ int tmpDictSortCnt = 0;
+ int tmpDictNoSortCnt = 0;
+ boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
+
+ for (int i = 0; i < isDimNoDictFlags.length; i++) {
+ if (isDimNoDictFlags[i]) {
+ if (i < sortColCnt && isSortColNoDictFlags[i]) {
+ noDictSortDimIdx[tmpNoDictSortCnt++] = i;
+ } else {
+ noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
+ }
+ } else {
+ if (i < sortColCnt && !isSortColNoDictFlags[i]) {
+ dictSortDimIdx[tmpDictSortCnt++] = i;
+ } else {
+ dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
+ }
+ }
+ }
+ dictNoSortDimCnt = tmpDictNoSortCnt;
+
+ int base = isDimNoDictFlags.length;
+ // adding complex dimension columns
+ for (int i = 0; i < complexDimCnt; i++) {
+ noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = base + i;
+ }
+ noDictNoSortDimCnt = tmpNoDictNoSortCnt;
+
+ base += complexDimCnt;
+ // indices for measure columns
+ for (int i = 0; i < measureCnt; i++) {
+ measureIdx[i] = base + i;
+ }
+ }
+
+ public int getDictSortDimCnt() {
+ return dictSortDimCnt;
+ }
+
+ public int getDictNoSortDimCnt() {
+ return dictNoSortDimCnt;
+ }
+
+ public int getNoDictSortDimCnt() {
+ return noDictSortDimCnt;
+ }
+
+ public int getNoDictNoSortDimCnt() {
+ return noDictNoSortDimCnt;
+ }
+
+ public boolean[] getIsSortColNoDictFlags() {
+ return isSortColNoDictFlags;
+ }
+
+ public int getMeasureCnt() {
+ return measureCnt;
+ }
+
+ public DataType[] getMeasureDataType() {
+ return measureDataType;
+ }
+
+ public int[] getDictSortDimIdx() {
+ return dictSortDimIdx;
+ }
+
+ public int[] getDictNoSortDimIdx() {
+ return dictNoSortDimIdx;
+ }
+
+ public int[] getNoDictSortDimIdx() {
+ return noDictSortDimIdx;
+ }
+
+ public int[] getNoDictNoSortDimIdx() {
+ return noDictNoSortDimIdx;
+ }
+
+ public int[] getMeasureIdx() {
+ return measureIdx;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof TableFieldStat)) return false;
+ TableFieldStat that = (TableFieldStat) o;
+ return dictSortDimCnt == that.dictSortDimCnt
+ && dictNoSortDimCnt == that.dictNoSortDimCnt
+ && noDictSortDimCnt == that.noDictSortDimCnt
+ && noDictNoSortDimCnt == that.noDictNoSortDimCnt
+ && measureCnt == that.measureCnt;
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt,
+ noDictNoSortDimCnt, measureCnt);
+ }
+}
\ No newline at end of file