You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/08 16:55:22 UTC
[21/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad]
Add size base block allocation in data loading
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 7ea5cb3..e5583c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -19,20 +19,35 @@ package org.apache.carbondata.processing.loading.sort.unsafe;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.IntPointerBuffer;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
/**
* It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
*/
public class UnsafeCarbonRowPage {
+
+ private boolean[] noDictionaryDimensionMapping;
+
+ private boolean[] noDictionarySortColumnMapping;
+
+ private int dimensionSize;
+
+ private int measureSize;
+
+ private DataType[] measureDataType;
+
+ private long[] nullSetWords;
+
private IntPointerBuffer buffer;
private int lastSize;
@@ -47,14 +62,16 @@ public class UnsafeCarbonRowPage {
private long taskId;
- private TableFieldStat tableFieldStat;
- private SortStepRowHandler sortStepRowHandler;
-
- public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
- boolean saveToDisk, long taskId) {
- this.tableFieldStat = tableFieldStat;
- this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+ public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+ boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
+ MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
+ this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+ this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
+ this.dimensionSize = dimensionSize;
+ this.measureSize = measureSize;
+ this.measureDataType = type;
this.saveToDisk = saveToDisk;
+ this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
this.taskId = taskId;
buffer = new IntPointerBuffer(this.taskId);
this.dataBlock = memoryBlock;
@@ -63,44 +80,255 @@ public class UnsafeCarbonRowPage {
this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
}
- public int addRow(Object[] row, ByteBuffer rowBuffer) {
- int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer);
+ public int addRow(Object[] row) {
+ int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
buffer.set(lastSize);
lastSize = lastSize + size;
return size;
}
- /**
- * add raw row as intermidiate sort temp row to page
- *
- * @param row
- * @param address
- * @return
- */
- private int addRow(Object[] row, long address, ByteBuffer rowBuffer) {
- return sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row,
- dataBlock.getBaseObject(), address, rowBuffer);
+ private int addRow(Object[] row, long address) {
+ if (row == null) {
+ throw new RuntimeException("Row is null ??");
+ }
+ int dimCount = 0;
+ int size = 0;
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ byte[] col = (byte[]) row[dimCount];
+ CarbonUnsafe.getUnsafe()
+ .putShort(baseObject, address + size, (short) col.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, col.length);
+ size += col.length;
+ } else {
+ int value = (int) row[dimCount];
+ CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
+ size += 4;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ byte[] col = (byte[]) row[dimCount];
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, col.length);
+ size += col.length;
+ }
+ Arrays.fill(nullSetWords, 0);
+ int nullSetSize = nullSetWords.length * 8;
+ int nullWordLoc = size;
+ size += nullSetSize;
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ Object value = row[mesCount + dimensionSize];
+ if (null != value) {
+ DataType dataType = measureDataType[mesCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ Boolean bval = (Boolean) value;
+ CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, bval);
+ size += 1;
+ } else if (dataType == DataTypes.SHORT) {
+ Short sval = (Short) value;
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
+ size += 2;
+ } else if (dataType == DataTypes.INT) {
+ Integer ival = (Integer) value;
+ CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
+ size += 4;
+ } else if (dataType == DataTypes.LONG) {
+ Long val = (Long) value;
+ CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
+ size += 8;
+ } else if (dataType == DataTypes.DOUBLE) {
+ Double doubleVal = (Double) value;
+ CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
+ size += 8;
+ } else if (DataTypes.isDecimal(dataType)) {
+ BigDecimal decimalVal = (BigDecimal) value;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+ CarbonUnsafe.getUnsafe()
+ .putShort(baseObject, address + size, (short) bigDecimalInBytes.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ } else {
+ throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
+ }
+ set(nullSetWords, mesCount);
+ } else {
+ unset(nullSetWords, mesCount);
+ }
+ }
+ CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
+ address + nullWordLoc, nullSetSize);
+ return size;
}
- /**
- * get one row from memory address
- * @param address address
- * @return one row
- */
- public IntermediateSortTempRow getRow(long address) {
- return sortStepRowHandler.readIntermediateSortTempRowFromUnsafeMemory(
- dataBlock.getBaseObject(), address);
+ public Object[] getRow(long address, Object[] rowToFill) {
+ int dimCount = 0;
+ int size = 0;
+
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ col.length);
+ size += col.length;
+ rowToFill[dimCount] = col;
+ } else {
+ int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ rowToFill[dimCount] = anInt;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+ size += col.length;
+ rowToFill[dimCount] = col;
+ }
+
+ int nullSetSize = nullSetWords.length * 8;
+ Arrays.fill(nullSetWords, 0);
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+ nullSetSize);
+ size += nullSetSize;
+
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ if (isSet(nullSetWords, mesCount)) {
+ DataType dataType = measureDataType[mesCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ Boolean bval = CarbonUnsafe.getUnsafe().getBoolean(baseObject, address + size);
+ size += 1;
+ rowToFill[dimensionSize + mesCount] = bval;
+ } else if (dataType == DataTypes.SHORT) {
+ Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ size += 2;
+ rowToFill[dimensionSize + mesCount] = sval;
+ } else if (dataType == DataTypes.INT) {
+ Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ rowToFill[dimensionSize + mesCount] = ival;
+ } else if (dataType == DataTypes.LONG) {
+ Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = val;
+ } else if (dataType == DataTypes.DOUBLE) {
+ Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = doubleVal;
+ } else if (DataTypes.isDecimal(dataType)) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+ } else {
+ throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
+ }
+ } else {
+ rowToFill[dimensionSize + mesCount] = null;
+ }
+ }
+ return rowToFill;
}
- /**
- * write a row to stream
- * @param address address of a row
- * @param stream stream
- * @throws IOException
- */
- public void writeRow(long address, DataOutputStream stream) throws IOException {
- sortStepRowHandler.writeIntermediateSortTempRowFromUnsafeMemoryToStream(
- dataBlock.getBaseObject(), address, stream);
+ public void fillRow(long address, DataOutputStream stream) throws IOException {
+ int dimCount = 0;
+ int size = 0;
+
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ col.length);
+ size += col.length;
+ stream.writeShort(aShort);
+ stream.write(col);
+ } else {
+ int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ stream.writeInt(anInt);
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+ size += col.length;
+ stream.writeShort(aShort);
+ stream.write(col);
+ }
+
+ int nullSetSize = nullSetWords.length * 8;
+ Arrays.fill(nullSetWords, 0);
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+ nullSetSize);
+ size += nullSetSize;
+ for (int i = 0; i < nullSetWords.length; i++) {
+ stream.writeLong(nullSetWords[i]);
+ }
+
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ if (isSet(nullSetWords, mesCount)) {
+ DataType dataType = measureDataType[mesCount];
+ if (dataType == DataTypes.SHORT) {
+ short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ size += 2;
+ stream.writeShort(sval);
+ } else if (dataType == DataTypes.INT) {
+ int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ stream.writeInt(ival);
+ } else if (dataType == DataTypes.LONG) {
+ long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+ size += 8;
+ stream.writeLong(val);
+ } else if (dataType == DataTypes.DOUBLE) {
+ double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+ size += 8;
+ stream.writeDouble(doubleVal);
+ } else if (DataTypes.isDecimal(dataType)) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ stream.writeShort(aShort);
+ stream.write(bigDecimalInBytes);
+ } else {
+ throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
+ }
+ }
+ }
}
public void freeMemory() {
@@ -134,8 +362,27 @@ public class UnsafeCarbonRowPage {
return dataBlock;
}
- public TableFieldStat getTableFieldStat() {
- return tableFieldStat;
+ public static void set(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ words[wordOffset] |= (1L << index);
+ }
+
+ public static void unset(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ words[wordOffset] &= ~(1L << index);
+ }
+
+ public static boolean isSet(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ return ((words[wordOffset] & (1L << index)) != 0);
+ }
+
+ public boolean[] getNoDictionaryDimensionMapping() {
+ return noDictionaryDimensionMapping;
+ }
+
+ public boolean[] getNoDictionarySortColumnMapping() {
+ return noDictionarySortColumnMapping;
}
public void setNewDataBlock(MemoryBlock newMemoryBlock) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 5d038d3..4dd5e44 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.loading.sort.unsafe;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -42,14 +41,13 @@ import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
-import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
import org.apache.carbondata.processing.loading.sort.unsafe.sort.TimSort;
import org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDataFormat;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
public class UnsafeSortDataRows {
@@ -71,8 +69,7 @@ public class UnsafeSortDataRows {
*/
private SortParameters parameters;
- private TableFieldStat tableFieldStat;
- private ThreadLocal<ByteBuffer> rowBuffer;
+
private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
private UnsafeCarbonRowPage rowPage;
@@ -97,13 +94,7 @@ public class UnsafeSortDataRows {
public UnsafeSortDataRows(SortParameters parameters,
UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
this.parameters = parameters;
- this.tableFieldStat = new TableFieldStat(parameters);
- this.rowBuffer = new ThreadLocal<ByteBuffer>() {
- @Override protected ByteBuffer initialValue() {
- byte[] backedArray = new byte[2 * 1024 * 1024];
- return ByteBuffer.wrap(backedArray);
- }
- };
+
this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
// observer of writing file in thread
@@ -136,7 +127,11 @@ public class UnsafeSortDataRows {
if (isMemoryAvailable) {
UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
}
- this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
+ this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount() + parameters.getComplexDimColCount(),
+ parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
+ !isMemoryAvailable, taskId);
// Delete if any older file exists in sort temp folder
deleteSortLocationIfExists();
@@ -183,7 +178,7 @@ public class UnsafeSortDataRows {
private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
for (int i = 0; i < size; i++) {
if (rowPage.canAdd()) {
- bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
+ bytesAdded += rowPage.addRow(rowBatch[i]);
} else {
try {
if (enableInMemoryIntermediateMerge) {
@@ -199,8 +194,15 @@ public class UnsafeSortDataRows {
if (!saveToDisk) {
UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
}
- rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
- bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
+ rowPage = new UnsafeCarbonRowPage(
+ parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount() + parameters.getComplexDimColCount(),
+ parameters.getMeasureColCount(),
+ parameters.getMeasureDataType(),
+ memoryBlock,
+ saveToDisk, taskId);
+ bytesAdded += rowPage.addRow(rowBatch[i]);
} catch (Exception e) {
LOGGER.error(
"exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -218,7 +220,7 @@ public class UnsafeSortDataRows {
// if record holder list size is equal to sort buffer size then it will
// sort the list and then write current list data to file
if (rowPage.canAdd()) {
- rowPage.addRow(row, rowBuffer.get());
+ rowPage.addRow(row);
} else {
try {
if (enableInMemoryIntermediateMerge) {
@@ -233,8 +235,13 @@ public class UnsafeSortDataRows {
if (!saveToDisk) {
UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
}
- rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
- rowPage.addRow(row, rowBuffer.get());
+ rowPage = new UnsafeCarbonRowPage(
+ parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount(), parameters.getMeasureColCount(),
+ parameters.getMeasureDataType(), memoryBlock,
+ saveToDisk, taskId);
+ rowPage.addRow(row);
} catch (Exception e) {
LOGGER.error(
"exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -262,7 +269,7 @@ public class UnsafeSortDataRows {
new UnsafeRowComparator(rowPage));
} else {
timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
- new UnsafeRowComparatorForNormalDims(rowPage));
+ new UnsafeRowComparatorForNormalDIms(rowPage));
}
unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
} else {
@@ -288,9 +295,10 @@ public class UnsafeSortDataRows {
// write number of entries to the file
stream.writeInt(actualSize);
for (int i = 0; i < actualSize; i++) {
- rowPage.writeRow(
- rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(), stream);
+ rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(),
+ stream);
}
+
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
} finally {
@@ -359,7 +367,7 @@ public class UnsafeSortDataRows {
new UnsafeRowComparator(page));
} else {
timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
- new UnsafeRowComparatorForNormalDims(page));
+ new UnsafeRowComparatorForNormalDIms(page));
}
if (page.isSaveToDisk()) {
// create a new file every time
@@ -372,8 +380,7 @@ public class UnsafeSortDataRows {
writeDataToFile(page, sortTempFile);
LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
+ " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
- + sortTempFile + ", sort temp file size in MB is "
- + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
+ + sortTempFile);
page.freeMemory();
// add sort temp filename to and arrayList. When the list size reaches 20 then
// intermediate merging of sort temp files will be triggered
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
index 33342dc..d02be9b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -23,25 +23,63 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
+
+ /**
+ * mapping of dictionary and no dictionary of sort_columns.
+ */
+ private boolean[] noDictionarySortColumnMaping;
+
private Object baseObject;
- private TableFieldStat tableFieldStat;
- private int dictSizeInMemory;
public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
+ this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
this.baseObject = rowPage.getDataBlock().getBaseObject();
- this.tableFieldStat = rowPage.getTableFieldStat();
- this.dictSizeInMemory = (tableFieldStat.getDictSortDimCnt()
- + tableFieldStat.getDictNoSortDimCnt()) * 4;
}
/**
* Below method will be used to compare two mdkey
*/
public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
- return compare(rowL, baseObject, rowR, baseObject);
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+ if (isNoDictionary) {
+ short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + sizeA);
+ byte[] byteArr1 = new byte[aShort1];
+ sizeA += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
+ sizeA += aShort1;
+
+ short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + sizeB);
+ byte[] byteArr2 = new byte[aShort2];
+ sizeB += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
+ sizeB += aShort2;
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ }
+
+ return diff;
}
/**
@@ -52,40 +90,35 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
int diff = 0;
long rowA = rowL.address;
long rowB = rowR.address;
- int sizeInDictPartA = 0;
-
- int sizeInNonDictPartA = 0;
- int sizeInDictPartB = 0;
- int sizeInNonDictPartB = 0;
- for (boolean isNoDictionary : tableFieldStat.getIsSortColNoDictFlags()) {
+ int sizeA = 0;
+ int sizeB = 0;
+ for (boolean isNoDictionary : noDictionarySortColumnMaping) {
if (isNoDictionary) {
- short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL,
- rowA + dictSizeInMemory + sizeInNonDictPartA);
- byte[] byteArr1 = new byte[lengthA];
- sizeInNonDictPartA += 2;
+ short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + sizeA);
+ byte[] byteArr1 = new byte[aShort1];
+ sizeA += 2;
CarbonUnsafe.getUnsafe()
- .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA,
- byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
- sizeInNonDictPartA += lengthA;
+ .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ aShort1);
+ sizeA += aShort1;
- short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR,
- rowB + dictSizeInMemory + sizeInNonDictPartB);
- byte[] byteArr2 = new byte[lengthB];
- sizeInNonDictPartB += 2;
+ short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + sizeB);
+ byte[] byteArr2 = new byte[aShort2];
+ sizeB += 2;
CarbonUnsafe.getUnsafe()
- .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB,
- byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
- sizeInNonDictPartB += lengthB;
+ .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ aShort2);
+ sizeB += aShort2;
int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
if (difference != 0) {
return difference;
}
} else {
- int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeInDictPartA);
- sizeInDictPartA += 4;
- int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeInDictPartB);
- sizeInDictPartB += 4;
+ int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeB);
+ sizeB += 4;
diff = dimFieldA - dimFieldB;
if (diff != 0) {
return diff;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
new file mode 100644
index 0000000..483dcb2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> {
+
+ private Object baseObject;
+
+ private int numberOfSortColumns;
+
+ public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
+ this.baseObject = rowPage.getDataBlock().getBaseObject();
+ this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (int i = 0; i < numberOfSortColumns; i++) {
+ int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
deleted file mode 100644
index e9cfb1c..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
-
-public class UnsafeRowComparatorForNormalDims implements Comparator<UnsafeCarbonRow> {
-
- private Object baseObject;
-
- private int numberOfSortColumns;
-
- public UnsafeRowComparatorForNormalDims(UnsafeCarbonRowPage rowPage) {
- this.baseObject = rowPage.getDataBlock().getBaseObject();
- this.numberOfSortColumns = rowPage.getTableFieldStat().getIsSortColNoDictFlags().length;
- }
-
- /**
- * Below method will be used to compare two mdkey
- */
- public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
- int diff = 0;
- long rowA = rowL.address;
- long rowB = rowR.address;
- int sizeA = 0;
- int sizeB = 0;
- for (int i = 0; i < numberOfSortColumns; i++) {
- int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
- sizeA += 4;
- int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
- sizeB += 4;
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
-
- return diff;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
index d790c41..686e855 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
@@ -17,7 +17,6 @@
package org.apache.carbondata.processing.loading.sort.unsafe.holder;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
/**
@@ -29,7 +28,7 @@ public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> {
void readRow() throws CarbonSortKeyAndGroupByException;
- IntermediateSortTempRow getRow();
+ Object[] getRow();
int numberOfRows();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index a776db1..6b0cfa6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -19,10 +19,9 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
-import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
@@ -39,18 +38,21 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
private UnsafeCarbonRowPage[] rowPages;
- private IntermediateSortTempRowComparator comparator;
+ private NewRowComparator comparator;
- private IntermediateSortTempRow currentRow;
+ private Object[] currentRow;
+
+ private int columnSize;
public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
- boolean[] noDictSortColumnMapping) {
+ boolean[] noDictSortColumnMapping, int columnSize) {
this.actualSize = merger.getEntryCount();
this.mergedAddresses = merger.getMergedAddresses();
this.rowPageIndexes = merger.getRowPageIndexes();
this.rowPages = merger.getUnsafeCarbonRowPages();
LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
- this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping);
+ this.comparator = new NewRowComparator(noDictSortColumnMapping);
+ this.columnSize = columnSize;
}
public boolean hasNext() {
@@ -61,11 +63,12 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
}
public void readRow() {
- currentRow = rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter]);
+ currentRow = new Object[columnSize];
+ rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow);
counter++;
}
- public IntermediateSortTempRow getRow() {
+ public Object[] getRow() {
return currentRow;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index cbcbbae..6f05088 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -19,9 +19,8 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
public class UnsafeInmemoryHolder implements SortTempChunkHolder {
@@ -34,18 +33,21 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
private UnsafeCarbonRowPage rowPage;
- private IntermediateSortTempRow currentRow;
+ private Object[] currentRow;
private long address;
- private IntermediateSortTempRowComparator comparator;
+ private NewRowComparator comparator;
- public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage) {
+ private int columnSize;
+
+ public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
+ int numberOfSortColumns) {
this.actualSize = rowPage.getBuffer().getActualSize();
this.rowPage = rowPage;
LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
- this.comparator = new IntermediateSortTempRowComparator(
- rowPage.getTableFieldStat().getIsSortColNoDictFlags());
+ this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
+ this.columnSize = columnSize;
}
public boolean hasNext() {
@@ -56,12 +58,13 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
}
public void readRow() {
+ currentRow = new Object[columnSize];
address = rowPage.getBuffer().get(counter);
- currentRow = rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset());
+ rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow);
counter++;
}
- public IntermediateSortTempRow getRow() {
+ public Object[] getRow() {
return currentRow;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 527452a..11b3d43 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -31,14 +31,15 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
@@ -62,15 +63,21 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
* entry count
*/
private int entryCount;
+
/**
* return row
*/
- private IntermediateSortTempRow returnRow;
+ private Object[] returnRow;
+ private int dimCnt;
+ private int complexCnt;
+ private int measureCnt;
+ private boolean[] isNoDictionaryDimensionColumn;
+ private DataType[] measureDataTypes;
private int readBufferSize;
private String compressorName;
- private IntermediateSortTempRow[] currentBuffer;
+ private Object[][] currentBuffer;
- private IntermediateSortTempRow[] backupBuffer;
+ private Object[][] backupBuffer;
private boolean isBackupFilled;
@@ -93,21 +100,27 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
private int numberOfObjectRead;
- private TableFieldStat tableFieldStat;
- private SortStepRowHandler sortStepRowHandler;
- private Comparator<IntermediateSortTempRow> comparator;
+ private int nullSetWordsLength;
+
+ private Comparator<Object[]> comparator;
+
/**
* Constructor to initialize
*/
public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
// set temp file
this.tempFile = tempFile;
+ this.dimCnt = parameters.getDimColCount();
+ this.complexCnt = parameters.getComplexDimColCount();
+ this.measureCnt = parameters.getMeasureColCount();
+ this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+ this.measureDataTypes = parameters.getMeasureDataType();
this.readBufferSize = parameters.getBufferSize();
this.compressorName = parameters.getSortTempCompressorName();
- this.tableFieldStat = new TableFieldStat(parameters);
- this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+
this.executorService = Executors.newFixedThreadPool(1);
- comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn());
+ this.nullSetWordsLength = ((parameters.getMeasureColCount() - 1) >> 6) + 1;
+ comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
initialize();
}
@@ -156,17 +169,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
*
* @throws CarbonSortKeyAndGroupByException problem while reading
*/
- @Override
public void readRow() throws CarbonSortKeyAndGroupByException {
if (prefetch) {
fillDataForPrefetch();
} else {
- try {
- this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
- this.numberOfObjectRead++;
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problems while reading row", e);
- }
+ this.returnRow = getRowFromStream();
}
}
@@ -200,22 +207,63 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
}
/**
- * get a batch of row, this interface is used in reading compressed sort temp files
- *
- * @param expected expected number in a batch
- * @return a batch of row
- * @throws IOException if error occurs while reading from stream
+ * @return
+ * @throws CarbonSortKeyAndGroupByException
*/
- private IntermediateSortTempRow[] readBatchedRowFromStream(int expected)
- throws IOException {
- IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
- for (int i = 0; i < expected; i++) {
- IntermediateSortTempRow holder
- = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
- holders[i] = holder;
+ private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
+ Object[] row = new Object[dimCnt + measureCnt];
+ try {
+ int dimCount = 0;
+ for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+ if (isNoDictionaryDimensionColumn[dimCount]) {
+ short aShort = stream.readShort();
+ byte[] col = new byte[aShort];
+ stream.readFully(col);
+ row[dimCount] = col;
+ } else {
+ int anInt = stream.readInt();
+ row[dimCount] = anInt;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimCnt; dimCount++) {
+ short aShort = stream.readShort();
+ byte[] col = new byte[aShort];
+ stream.readFully(col);
+ row[dimCount] = col;
+ }
+
+ long[] words = new long[nullSetWordsLength];
+ for (int i = 0; i < words.length; i++) {
+ words[i] = stream.readLong();
+ }
+
+ for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
+ if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
+ DataType dataType = measureDataTypes[mesCount];
+ if (dataType == DataTypes.SHORT) {
+ row[dimCount + mesCount] = stream.readShort();
+ } else if (dataType == DataTypes.INT) {
+ row[dimCount + mesCount] = stream.readInt();
+ } else if (dataType == DataTypes.LONG) {
+ row[dimCount + mesCount] = stream.readLong();
+ } else if (dataType == DataTypes.DOUBLE) {
+ row[dimCount + mesCount] = stream.readDouble();
+ } else if (DataTypes.isDecimal(dataType)) {
+ short aShort = stream.readShort();
+ byte[] bigDecimalInBytes = new byte[aShort];
+ stream.readFully(bigDecimalInBytes);
+ row[dimCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+ } else {
+ throw new IllegalArgumentException("unsupported data type:" + dataType);
+ }
+ }
+ }
+ return row;
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException(e);
}
- this.numberOfObjectRead += expected;
- return holders;
}
/**
@@ -223,7 +271,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
*
* @return row
*/
- public IntermediateSortTempRow getRow() {
+ public Object[] getRow() {
return this.returnRow;
}
@@ -278,7 +326,9 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
@Override public int hashCode() {
int hash = 0;
- hash += tableFieldStat.hashCode();
+ hash += 31 * measureCnt;
+ hash += 31 * dimCnt;
+ hash += 31 * complexCnt;
hash += tempFile.hashCode();
return hash;
}
@@ -318,12 +368,16 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
/**
* This method will read the records from sort temp file and keep it in a buffer
*
- * @param numberOfRecords number of records to be read
- * @return batch of intermediate sort temp row
- * @throws IOException if error occurs reading records from file
+ * @param numberOfRecords
+ * @return
+ * @throws CarbonSortKeyAndGroupByException
*/
- private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords)
- throws IOException {
- return readBatchedRowFromStream(numberOfRecords);
+ private Object[][] prefetchRecordsFromFile(int numberOfRecords)
+ throws CarbonSortKeyAndGroupByException {
+ Object[][] records = new Object[numberOfRecords][];
+ for (int i = 0; i < numberOfRecords; i++) {
+ records[i] = getRowFromStream();
+ }
+ return records;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 22673ff..4bbf61b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -21,21 +21,25 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.AbstractQueue;
+import java.util.Arrays;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
public class UnsafeIntermediateFileMerger implements Callable<Void> {
/**
@@ -65,13 +69,22 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
private int totalNumberOfRecords;
private SortParameters mergerParameters;
- private TableFieldStat tableFieldStat;
+
private File[] intermediateFiles;
+
private File outPutFile;
+ private int dimCnt;
+ private int complexCnt;
+ private int measureCnt;
+ private boolean[] isNoDictionaryDimensionColumn;
+ private DataType[] measureDataTypes;
private int writeBufferSize;
private String compressorName;
- private SortStepRowHandler sortStepRowHandler;
+
+ private long[] nullSetWords;
+
+ private ByteBuffer rowData;
private Throwable throwable;
@@ -84,10 +97,16 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
this.fileCounter = intermediateFiles.length;
this.intermediateFiles = intermediateFiles;
this.outPutFile = outPutFile;
+ this.dimCnt = mergerParameters.getDimColCount();
+ this.complexCnt = mergerParameters.getComplexDimColCount();
+ this.measureCnt = mergerParameters.getMeasureColCount();
+ this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
+ this.measureDataTypes = mergerParameters.getMeasureDataType();
this.writeBufferSize = mergerParameters.getBufferSize();
this.compressorName = mergerParameters.getSortTempCompressorName();
- this.tableFieldStat = new TableFieldStat(mergerParameters);
- this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+ this.nullSetWords = new long[((measureCnt - 1) >> 6) + 1];
+ // Take size of 2 MB for each row. I think it is high enough to use
+ rowData = ByteBuffer.allocate(2 * 1024 * 1024);
}
@Override public Void call() throws Exception {
@@ -146,14 +165,13 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
}
/**
- * This method will be used to get sorted sort temp row from the sort temp files
+ * This method will be used to get the sorted record from file
*
* @return sorted record sorted record
* @throws CarbonSortKeyAndGroupByException
*/
- private IntermediateSortTempRow getSortedRecordFromFile()
- throws CarbonSortKeyAndGroupByException {
- IntermediateSortTempRow row = null;
+ private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
+ Object[] row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
@@ -217,7 +235,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
this.recordHolderHeap.add(sortTempFileChunkHolder);
}
- LOGGER.info("Heap Size: " + this.recordHolderHeap.size());
+ LOGGER.info("Heap Size" + this.recordHolderHeap.size());
}
/**
@@ -232,12 +250,12 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
}
/**
- * This method will be used to get the sorted sort temp row
+ * This method will be used to get the sorted row
*
* @return sorted row
* @throws CarbonSortKeyAndGroupByException
*/
- private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
+ private Object[] next() throws CarbonSortKeyAndGroupByException {
return getSortedRecordFromFile();
}
@@ -254,16 +272,82 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
/**
* Below method will be used to write data to file
*
- * @throws IOException problem while writing
+ * @throws CarbonSortKeyAndGroupByException problem while writing
*/
- private void writeDataToFile(IntermediateSortTempRow row) throws IOException {
- sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream);
+ private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
+ int dimCount = 0;
+ int size = 0;
+ for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+ if (isNoDictionaryDimensionColumn[dimCount]) {
+ byte[] col = (byte[]) row[dimCount];
+ rowData.putShort((short) col.length);
+ size += 2;
+ rowData.put(col);
+ size += col.length;
+ } else {
+ rowData.putInt((int) row[dimCount]);
+ size += 4;
+ }
+ }
+
+ // write complex dimensions here.
+ int dimensionSize = dimCnt + complexCnt;
+ for (; dimCount < dimensionSize; dimCount++) {
+ byte[] col = (byte[]) row[dimCount];
+ rowData.putShort((short)col.length);
+ size += 2;
+ rowData.put(col);
+ size += col.length;
+ }
+ Arrays.fill(nullSetWords, 0);
+ int nullSetSize = nullSetWords.length * 8;
+ int nullLoc = size;
+ size += nullSetSize;
+ for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
+ Object value = row[mesCount + dimensionSize];
+ if (null != value) {
+ DataType dataType = measureDataTypes[mesCount];
+ if (dataType == DataTypes.SHORT) {
+ rowData.putShort(size, (Short) value);
+ size += 2;
+ } else if (dataType == DataTypes.INT) {
+ rowData.putInt(size, (Integer) value);
+ size += 4;
+ } else if (dataType == DataTypes.LONG) {
+ rowData.putLong(size, (Long) value);
+ size += 8;
+ } else if (dataType == DataTypes.DOUBLE) {
+ rowData.putDouble(size, (Double) value);
+ size += 8;
+ } else if (DataTypes.isDecimal(dataType)) {
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(((BigDecimal) value));
+ rowData.putShort(size, (short) bigDecimalInBytes.length);
+ size += 2;
+ for (int i = 0; i < bigDecimalInBytes.length; i++) {
+ rowData.put(size++, bigDecimalInBytes[i]);
+ }
+ }
+ UnsafeCarbonRowPage.set(nullSetWords, mesCount);
+ } else {
+ UnsafeCarbonRowPage.unset(nullSetWords, mesCount);
+ }
+ }
+ for (int i = 0; i < nullSetWords.length; i++) {
+ rowData.putLong(nullLoc, nullSetWords[i]);
+ nullLoc += 8;
+ }
+ byte[] rowBytes = new byte[size];
+ rowData.position(0);
+ rowData.get(rowBytes);
+ stream.write(rowBytes);
+ rowData.clear();
}
private void finish() throws CarbonSortKeyAndGroupByException {
clear();
try {
CarbonUtil.deleteFiles(intermediateFiles);
+ rowData.clear();
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 64f3c25..ce118d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -29,8 +29,7 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.processing.loading.sort.SortStepRowUtil;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeFinalMergePageHolder;
@@ -56,7 +55,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;
private SortParameters parameters;
- private SortStepRowHandler sortStepRowHandler;
+ private SortStepRowUtil sortStepRowUtil;
/**
* tempFileLocation
*/
@@ -69,7 +68,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
String[] tempFileLocation) {
this.parameters = parameters;
- this.sortStepRowHandler = new SortStepRowHandler(parameters);
+ this.sortStepRowUtil = new SortStepRowUtil(parameters);
this.tempFileLocation = tempFileLocation;
this.tableName = parameters.getTableName();
}
@@ -109,7 +108,9 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
LOGGER.info("Started adding first record from each page");
for (final UnsafeCarbonRowPage rowPage : rowPages) {
- SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage);
+ SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
+ parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
+ .getMeasureColCount(), parameters.getNumberOfSortColumns());
// initialize
sortTempFileChunkHolder.readRow();
@@ -120,7 +121,9 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
SortTempChunkHolder sortTempFileChunkHolder =
- new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn());
+ new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
+ .getMeasureColCount());
// initialize
sortTempFileChunkHolder.readRow();
@@ -139,7 +142,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
recordHolderHeapLocal.add(sortTempFileChunkHolder);
}
- LOGGER.info("Heap Size: " + this.recordHolderHeapLocal.size());
+ LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
} catch (Exception e) {
LOGGER.error(e);
throw new CarbonDataWriterException(e);
@@ -177,14 +180,12 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
}
/**
- * This method will be used to get the sorted row in 3-parted format.
- * The row will feed the following writer process step.
+ * This method will be used to get the sorted row
*
* @return sorted row
*/
public Object[] next() {
- IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
- return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+ return sortStepRowUtil.convertRow(getSortedRecordFromFile());
}
/**
@@ -192,8 +193,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
*
* @return sorted record sorted record
*/
- private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException {
- IntermediateSortTempRow row = null;
+ private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
+ Object[] row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 4579c85..1ab803b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1005,14 +1005,8 @@ public final class CarbonDataMergerUtil {
/**
* This method traverses Update Delta Files inside the seg and return true
* if UpdateDelta Files are more than IUD Compaction threshold.
- *
- * @param seg
- * @param identifier
- * @param segmentUpdateStatusManager
- * @param numberDeltaFilesThreshold
- * @return
*/
- public static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
+ private static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
AbsoluteTableIdentifier identifier, SegmentUpdateStatusManager segmentUpdateStatusManager,
int numberDeltaFilesThreshold) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index bce1b33..038d34c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -389,6 +389,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length);
}
sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
+
String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
finalMerger =
@@ -406,8 +407,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
+ carbonLoadModel.getFactTimeStamp() + ".tmp";
} else {
carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(),
- tableName, carbonLoadModel.getSegmentId());
+ .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName,
+ carbonLoadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 4aca13a..2616def 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -76,9 +76,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
.getFactTimeStamp() + ".tmp";
} else {
- carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
- tableName, loadModel.getSegmentId());
+ carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+ loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 92db4c5..221697f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -47,9 +47,8 @@ public class RowResultProcessor {
CarbonDataProcessorUtil.createLocations(tempStoreLocation);
this.segmentProperties = segProp;
String tableName = carbonTable.getTableName();
- String carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
- tableName, loadModel.getSegmentId());
+ String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+ loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
CarbonFactDataHandlerModel carbonFactDataHandlerModel =
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
segProp, tableName, tempStoreLocation, carbonStoreLocation);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index c06819c..04efa1f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.math.BigDecimal;
import java.util.AbstractQueue;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
@@ -28,9 +29,11 @@ import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
public class IntermediateFileMerger implements Callable<Void> {
@@ -65,12 +68,17 @@ public class IntermediateFileMerger implements Callable<Void> {
private File[] intermediateFiles;
private File outPutFile;
+ private int dimCnt;
+ private int noDictDimCnt;
+ private int complexCnt;
+ private int measureCnt;
+ private boolean[] isNoDictionaryDimensionColumn;
+ private DataType[] measureDataTypes;
private int writeBufferSize;
private String compressorName;
private Throwable throwable;
- private TableFieldStat tableFieldStat;
- private SortStepRowHandler sortStepRowHandler;
+
/**
* IntermediateFileMerger Constructor
*/
@@ -80,10 +88,14 @@ public class IntermediateFileMerger implements Callable<Void> {
this.fileCounter = intermediateFiles.length;
this.intermediateFiles = intermediateFiles;
this.outPutFile = outPutFile;
+ this.dimCnt = mergerParameters.getDimColCount();
+ this.noDictDimCnt = mergerParameters.getNoDictionaryCount();
+ this.complexCnt = mergerParameters.getComplexDimColCount();
+ this.measureCnt = mergerParameters.getMeasureColCount();
+ this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
+ this.measureDataTypes = mergerParameters.getMeasureDataType();
this.writeBufferSize = mergerParameters.getBufferSize();
this.compressorName = mergerParameters.getSortTempCompressorName();
- this.tableFieldStat = new TableFieldStat(mergerParameters);
- this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
}
@Override public Void call() throws Exception {
@@ -142,14 +154,13 @@ public class IntermediateFileMerger implements Callable<Void> {
}
/**
- * This method will be used to get the sorted sort temp row from sort temp file
+ * This method will be used to get the sorted record from file
*
* @return sorted record sorted record
* @throws CarbonSortKeyAndGroupByException
*/
- private IntermediateSortTempRow getSortedRecordFromFile()
- throws CarbonSortKeyAndGroupByException {
- IntermediateSortTempRow row = null;
+ private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
+ Object[] row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
@@ -216,7 +227,7 @@ public class IntermediateFileMerger implements Callable<Void> {
this.recordHolderHeap.add(sortTempFileChunkHolder);
}
- LOGGER.info("Heap Size: " + this.recordHolderHeap.size());
+ LOGGER.info("Heap Size" + this.recordHolderHeap.size());
}
/**
@@ -231,12 +242,12 @@ public class IntermediateFileMerger implements Callable<Void> {
}
/**
- * This method will be used to get the sorted sort temp row
+ * This method will be used to get the sorted row
*
* @return sorted row
* @throws CarbonSortKeyAndGroupByException
*/
- private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
+ private Object[] next() throws CarbonSortKeyAndGroupByException {
return getSortedRecordFromFile();
}
@@ -253,10 +264,62 @@ public class IntermediateFileMerger implements Callable<Void> {
/**
* Below method will be used to write data to file
*
- * @throws IOException problem while writing
+ * @throws CarbonSortKeyAndGroupByException problem while writing
*/
- private void writeDataToFile(IntermediateSortTempRow row) throws IOException {
- sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream);
+ private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException {
+ try {
+ int[] mdkArray = (int[]) row[0];
+ byte[][] nonDictArray = (byte[][]) row[1];
+ int mdkIndex = 0;
+ int nonDictKeyIndex = 0;
+ // write dictionary and non dictionary dimensions here.
+ for (boolean nodictinary : isNoDictionaryDimensionColumn) {
+ if (nodictinary) {
+ byte[] col = nonDictArray[nonDictKeyIndex++];
+ stream.writeShort(col.length);
+ stream.write(col);
+ } else {
+ stream.writeInt(mdkArray[mdkIndex++]);
+ }
+ }
+ // write complex
+ for (; nonDictKeyIndex < noDictDimCnt + complexCnt; nonDictKeyIndex++) {
+ byte[] col = nonDictArray[nonDictKeyIndex++];
+ stream.writeShort(col.length);
+ stream.write(col);
+ }
+ // write measure
+ int fieldIndex = 0;
+ for (int counter = 0; counter < measureCnt; counter++) {
+ if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
+ stream.write((byte) 1);
+ DataType dataType = measureDataTypes[counter];
+ if (dataType == DataTypes.BOOLEAN) {
+ stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row));
+ } else if (dataType == DataTypes.SHORT) {
+ stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row));
+ } else if (dataType == DataTypes.INT) {
+ stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row));
+ } else if (dataType == DataTypes.LONG) {
+ stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row));
+ } else if (dataType == DataTypes.DOUBLE) {
+ stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+ } else if (DataTypes.isDecimal(dataType)) {
+ byte[] bigDecimalInBytes = DataTypeUtil
+ .bigDecimalToByte((BigDecimal) NonDictionaryUtil.getMeasure(fieldIndex, row));
+ stream.writeInt(bigDecimalInBytes.length);
+ stream.write(bigDecimalInBytes);
+ } else {
+ throw new IllegalArgumentException("unsupported data type:" + dataType);
+ }
+ } else {
+ stream.write((byte) 0);
+ }
+ fieldIndex++;
+ }
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+ }
}
private void finish() throws CarbonSortKeyAndGroupByException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
deleted file mode 100644
index 9b6d1e8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-
-/**
- * This class is used as comparator for comparing intermediate sort temp row
- */
-public class IntermediateSortTempRowComparator implements Comparator<IntermediateSortTempRow> {
- /**
- * isSortColumnNoDictionary whether the sort column is not dictionary or not
- */
- private boolean[] isSortColumnNoDictionary;
-
- /**
- * @param isSortColumnNoDictionary isSortColumnNoDictionary
- */
- public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) {
- this.isSortColumnNoDictionary = isSortColumnNoDictionary;
- }
-
- /**
- * Below method will be used to compare two sort temp row
- */
- public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow rowB) {
- int diff = 0;
- int dictIndex = 0;
- int nonDictIndex = 0;
-
- for (boolean isNoDictionary : isSortColumnNoDictionary) {
-
- if (isNoDictionary) {
- byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex];
- byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex];
- nonDictIndex++;
-
- int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
- if (difference != 0) {
- return difference;
- }
- } else {
- int dimFieldA = rowA.getDictSortDims()[dictIndex];
- int dimFieldB = rowB.getDictSortDims()[dictIndex];
- dictIndex++;
-
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
- }
- return diff;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
index 3f94533..d2579d2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -40,11 +40,14 @@ public class NewRowComparator implements Comparator<Object[]> {
*/
public int compare(Object[] rowA, Object[] rowB) {
int diff = 0;
+
int index = 0;
for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+
if (isNoDictionary) {
byte[] byteArr1 = (byte[]) rowA[index];
+
byte[] byteArr2 = (byte[]) rowB[index];
int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
@@ -54,7 +57,6 @@ public class NewRowComparator implements Comparator<Object[]> {
} else {
int dimFieldA = (int) rowA[index];
int dimFieldB = (int) rowB[index];
-
diff = dimFieldA - dimFieldB;
if (diff != 0) {
return diff;
@@ -63,6 +65,7 @@ public class NewRowComparator implements Comparator<Object[]> {
index++;
}
+
return diff;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
index 7538c92..e01b587 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
@@ -29,7 +29,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
private int numberOfSortColumns;
/**
- * NewRowComparatorForNormalDims Constructor
+ * RowComparatorForNormalDims Constructor
*
* @param numberOfSortColumns
*/
@@ -46,6 +46,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
int diff = 0;
for (int i = 0; i < numberOfSortColumns; i++) {
+
int dimFieldA = (int)rowA[i];
int dimFieldB = (int)rowB[i];
diff = dimFieldA - dimFieldB;