You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:22 UTC
[35/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
new file mode 100644
index 0000000..14ab838
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
+ */
+public class UnsafeCarbonRowPage {
+
+ private boolean[] noDictionaryDimensionMapping;
+
+ private boolean[] noDictionarySortColumnMapping;
+
+ private int dimensionSize;
+
+ private int measureSize;
+
+ private DataType[] measureDataType;
+
+ private long[] nullSetWords;
+
+ private IntPointerBuffer buffer;
+
+ private int lastSize;
+
+ private long sizeToBeUsed;
+
+ private MemoryBlock dataBlock;
+
+ private boolean saveToDisk;
+
+ private MemoryManagerType managerType;
+
+ private long taskId;
+
+ public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+ boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
+ MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
+ this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+ this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
+ this.dimensionSize = dimensionSize;
+ this.measureSize = measureSize;
+ this.measureDataType = type;
+ this.saveToDisk = saveToDisk;
+ this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
+ this.taskId = taskId;
+ buffer = new IntPointerBuffer(this.taskId);
+ this.dataBlock = memoryBlock;
+ // TODO Only using 98% of space for safe side.May be we can have different logic.
+ sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
+ this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
+ }
+
+ public int addRow(Object[] row) {
+ int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
+ buffer.set(lastSize);
+ lastSize = lastSize + size;
+ return size;
+ }
+
+ private int addRow(Object[] row, long address) {
+ if (row == null) {
+ throw new RuntimeException("Row is null ??");
+ }
+ int dimCount = 0;
+ int size = 0;
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ byte[] col = (byte[]) row[dimCount];
+ CarbonUnsafe.getUnsafe()
+ .putShort(baseObject, address + size, (short) col.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, col.length);
+ size += col.length;
+ } else {
+ int value = (int) row[dimCount];
+ CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
+ size += 4;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ byte[] col = (byte[]) row[dimCount];
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, col.length);
+ size += col.length;
+ }
+ Arrays.fill(nullSetWords, 0);
+ int nullSetSize = nullSetWords.length * 8;
+ int nullWordLoc = size;
+ size += nullSetSize;
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ Object value = row[mesCount + dimensionSize];
+ if (null != value) {
+ switch (measureDataType[mesCount]) {
+ case SHORT:
+ Short sval = (Short) value;
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
+ size += 2;
+ break;
+ case INT:
+ Integer ival = (Integer) value;
+ CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
+ size += 4;
+ break;
+ case LONG:
+ Long val = (Long) value;
+ CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
+ size += 8;
+ break;
+ case DOUBLE:
+ Double doubleVal = (Double) value;
+ CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
+ size += 8;
+ break;
+ case DECIMAL:
+ BigDecimal decimalVal = (BigDecimal) value;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size,
+ (short) bigDecimalInBytes.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported data type:" +
+ measureDataType[mesCount]);
+ }
+ set(nullSetWords, mesCount);
+ } else {
+ unset(nullSetWords, mesCount);
+ }
+ }
+ CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
+ address + nullWordLoc, nullSetSize);
+ return size;
+ }
+
+ public Object[] getRow(long address, Object[] rowToFill) {
+ int dimCount = 0;
+ int size = 0;
+
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ col.length);
+ size += col.length;
+ rowToFill[dimCount] = col;
+ } else {
+ int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ rowToFill[dimCount] = anInt;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+ size += col.length;
+ rowToFill[dimCount] = col;
+ }
+
+ int nullSetSize = nullSetWords.length * 8;
+ Arrays.fill(nullSetWords, 0);
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+ nullSetSize);
+ size += nullSetSize;
+
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ if (isSet(nullSetWords, mesCount)) {
+ switch (measureDataType[mesCount]) {
+ case SHORT:
+ Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ size += 2;
+ rowToFill[dimensionSize + mesCount] = sval;
+ break;
+ case INT:
+ Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ rowToFill[dimensionSize + mesCount] = ival;
+ break;
+ case LONG:
+ Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = val;
+ break;
+ case DOUBLE:
+ Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = doubleVal;
+ break;
+ case DECIMAL:
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported data type:" +
+ measureDataType[mesCount]);
+ }
+ } else {
+ rowToFill[dimensionSize + mesCount] = null;
+ }
+ }
+ return rowToFill;
+ }
+
+ public void fillRow(long address, DataOutputStream stream) throws IOException {
+ int dimCount = 0;
+ int size = 0;
+
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ col.length);
+ size += col.length;
+ stream.writeShort(aShort);
+ stream.write(col);
+ } else {
+ int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ stream.writeInt(anInt);
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+ size += col.length;
+ stream.writeShort(aShort);
+ stream.write(col);
+ }
+
+ int nullSetSize = nullSetWords.length * 8;
+ Arrays.fill(nullSetWords, 0);
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+ nullSetSize);
+ size += nullSetSize;
+ for (int i = 0; i < nullSetWords.length; i++) {
+ stream.writeLong(nullSetWords[i]);
+ }
+
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ if (isSet(nullSetWords, mesCount)) {
+ switch (measureDataType[mesCount]) {
+ case SHORT:
+ short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ size += 2;
+ stream.writeShort(sval);
+ break;
+ case INT:
+ int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+ size += 4;
+ stream.writeInt(ival);
+ break;
+ case LONG:
+ long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+ size += 8;
+ stream.writeLong(val);
+ break;
+ case DOUBLE:
+ double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+ size += 8;
+ stream.writeDouble(doubleVal);
+ break;
+ case DECIMAL:
+ short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ stream.writeShort(aShort);
+ stream.write(bigDecimalInBytes);
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported data type:" +
+ measureDataType[mesCount]);
+ }
+ }
+ }
+ }
+
+ public void freeMemory() {
+ switch (managerType) {
+ case UNSAFE_MEMORY_MANAGER:
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
+ break;
+ default:
+ UnsafeSortMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
+ buffer.freeMemory();
+ }
+ }
+
+ public boolean isSaveToDisk() {
+ return saveToDisk;
+ }
+
+ public IntPointerBuffer getBuffer() {
+ return buffer;
+ }
+
+ public int getUsedSize() {
+ return lastSize;
+ }
+
+ public boolean canAdd() {
+ return lastSize < sizeToBeUsed;
+ }
+
+ public MemoryBlock getDataBlock() {
+ return dataBlock;
+ }
+
+ public static void set(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ words[wordOffset] |= (1L << index);
+ }
+
+ public static void unset(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ words[wordOffset] &= ~(1L << index);
+ }
+
+ public static boolean isSet(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ return ((words[wordOffset] & (1L << index)) != 0);
+ }
+
+ public boolean[] getNoDictionaryDimensionMapping() {
+ return noDictionaryDimensionMapping;
+ }
+
+ public boolean[] getNoDictionarySortColumnMapping() {
+ return noDictionarySortColumnMapping;
+ }
+
+ public void setNewDataBlock(MemoryBlock newMemoryBlock) {
+ this.dataBlock = newMemoryBlock;
+ this.managerType = MemoryManagerType.UNSAFE_SORT_MEMORY_MANAGER;
+ }
+
+ public enum MemoryManagerType {
+ UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
new file mode 100644
index 0000000..88b72aa
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.sort.TimSort;
+import org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDataFormat;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class UnsafeSortDataRows {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeSortDataRows.class.getName());
+ /**
+ * threadStatusObserver
+ */
+ private ThreadStatusObserver threadStatusObserver;
+ /**
+ * executor service for data sort holder
+ */
+ private ExecutorService dataSorterAndWriterExecutorService;
+ /**
+ * semaphore which will used for managing sorted data object arrays
+ */
+
+ private SortParameters parameters;
+
+ private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
+
+ private UnsafeCarbonRowPage rowPage;
+
+ private final Object addRowsLock = new Object();
+
+ private long inMemoryChunkSize;
+
+ private boolean enableInMemoryIntermediateMerge;
+
+ private int bytesAdded;
+
+ private long maxSizeAllowed;
+
+ /**
+ * semaphore which will used for managing sorted data object arrays
+ */
+ private Semaphore semaphore;
+
+ private final long taskId;
+
+ public UnsafeSortDataRows(SortParameters parameters,
+ UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
+ this.parameters = parameters;
+
+ this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
+
+ // observer of writing file in thread
+ this.threadStatusObserver = new ThreadStatusObserver();
+ this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+ this.inMemoryChunkSize = inMemoryChunkSize;
+ this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L;
+ enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
+ CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
+
+ this.maxSizeAllowed = parameters.getBatchSortSizeinMb();
+ if (maxSizeAllowed <= 0) {
+ // If user does not input any memory size, then take half the size of usable memory configured
+ // in sort memory size.
+ this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
+ } else {
+ this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024;
+ }
+ }
+
+ /**
+ * This method will be used to initialize
+ */
+ public void initialize() throws MemoryException {
+ MemoryBlock baseBlock =
+ UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+ boolean isMemoryAvailable =
+ UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
+ if (isMemoryAvailable) {
+ UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
+ }
+ this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount() + parameters.getComplexDimColCount(),
+ parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
+ !isMemoryAvailable, taskId);
+ // Delete if any older file exists in sort temp folder
+ deleteSortLocationIfExists();
+
+ // create new sort temp directory
+ CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
+ this.dataSorterAndWriterExecutorService =
+ Executors.newFixedThreadPool(parameters.getNumberOfCores());
+ semaphore = new Semaphore(parameters.getNumberOfCores());
+ }
+
+ public boolean canAdd() {
+ return bytesAdded < maxSizeAllowed;
+ }
+
+ /**
+ * This method will be used to add new row
+ *
+ * @param rowBatch new rowBatch
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
+ // if record holder list size is equal to sort buffer size then it will
+ // sort the list and then write current list data to file
+ synchronized (addRowsLock) {
+ addBatch(rowBatch, size);
+ }
+ }
+
+ /**
+ * This method will be used to add new row
+ *
+ * @param rowBatch new rowBatch
+ * @param size
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ public void addRowBatchWithOutSync(Object[][] rowBatch, int size)
+ throws CarbonSortKeyAndGroupByException {
+ // if record holder list size is equal to sort buffer size then it will
+ // sort the list and then write current list data to file
+ addBatch(rowBatch, size);
+ }
+
+ private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
+ for (int i = 0; i < size; i++) {
+ if (rowPage.canAdd()) {
+ bytesAdded += rowPage.addRow(rowBatch[i]);
+ } else {
+ try {
+ if (enableInMemoryIntermediateMerge) {
+ unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+ }
+ unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+ semaphore.acquire();
+ dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage));
+ MemoryBlock memoryBlock =
+ UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+ boolean saveToDisk =
+ UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
+ if (!saveToDisk) {
+ UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
+ }
+ rowPage = new UnsafeCarbonRowPage(
+ parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount() + parameters.getComplexDimColCount(),
+ parameters.getMeasureColCount(),
+ parameters.getMeasureDataType(),
+ memoryBlock,
+ saveToDisk, taskId);
+ bytesAdded += rowPage.addRow(rowBatch[i]);
+ } catch (Exception e) {
+ LOGGER.error(
+ "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+
+ }
+ }
+ }
+
+ /**
+ * This method will be used to add new row
+ */
+ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
+ // if record holder list size is equal to sort buffer size then it will
+ // sort the list and then write current list data to file
+ if (rowPage.canAdd()) {
+ rowPage.addRow(row);
+ } else {
+ try {
+ if (enableInMemoryIntermediateMerge) {
+ unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+ }
+ unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+ semaphore.acquire();
+ dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
+ MemoryBlock memoryBlock =
+ UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+ boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
+ if (!saveToDisk) {
+ UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
+ }
+ rowPage = new UnsafeCarbonRowPage(
+ parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getDimColCount(), parameters.getMeasureColCount(),
+ parameters.getMeasureDataType(), memoryBlock,
+ saveToDisk, taskId);
+ rowPage.addRow(row);
+ } catch (Exception e) {
+ LOGGER.error(
+ "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+
+ }
+ }
+
+ /**
+ * Below method will be used to start storing process This method will get
+ * all the temp files present in sort temp folder then it will create the
+ * record holder heap and then it will read first record from each file and
+ * initialize the heap
+ *
+ * @throws InterruptedException
+ */
+ public void startSorting() throws InterruptedException {
+ LOGGER.info("Unsafe based sorting will be used");
+ if (this.rowPage.getUsedSize() > 0) {
+ TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
+ new UnsafeIntSortDataFormat(rowPage));
+ if (parameters.getNumberOfNoDictSortColumns() > 0) {
+ timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
+ new UnsafeRowComparator(rowPage));
+ } else {
+ timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
+ new UnsafeRowComparatorForNormalDIms(rowPage));
+ }
+ unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
+ } else {
+ rowPage.freeMemory();
+ }
+ startFileBasedMerge();
+ }
+
+ private void writeData(UnsafeCarbonRowPage rowPage, File file)
+ throws CarbonSortKeyAndGroupByException {
+ DataOutputStream stream = null;
+ try {
+ // open stream
+ stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
+ parameters.getFileWriteBufferSize()));
+ int actualSize = rowPage.getBuffer().getActualSize();
+ // write number of entries to the file
+ stream.writeInt(actualSize);
+ for (int i = 0; i < actualSize; i++) {
+ rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(),
+ stream);
+ }
+
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+ } finally {
+ // close streams
+ CarbonUtil.closeStreams(stream);
+ }
+ }
+
+ /**
+ * This method will be used to delete sort temp location is it is exites
+ */
+ public void deleteSortLocationIfExists() {
+ CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
+ }
+
+ /**
+ * Below method will be used to start file based merge
+ *
+ * @throws InterruptedException
+ */
+ private void startFileBasedMerge() throws InterruptedException {
+ dataSorterAndWriterExecutorService.shutdown();
+ dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
+ }
+
+ /**
+ * Observer class for thread execution
+ * In case of any failure we need stop all the running thread
+ */
+ private class ThreadStatusObserver {
+ /**
+ * Below method will be called if any thread fails during execution
+ *
+ * @param exception
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
+ dataSorterAndWriterExecutorService.shutdownNow();
+ unsafeInMemoryIntermediateFileMerger.close();
+ parameters.getObserver().setFailed(true);
+ LOGGER.error(exception);
+ throw new CarbonSortKeyAndGroupByException(exception);
+ }
+ }
+
+ /**
+ * This class is responsible for sorting and writing the object
+ * array which holds the records equal to given array size
+ */
+ private class DataSorterAndWriter implements Runnable {
+ private UnsafeCarbonRowPage page;
+
+ public DataSorterAndWriter(UnsafeCarbonRowPage rowPage) {
+ this.page = rowPage;
+ }
+
+ @Override
+ public void run() {
+ try {
+ long startTime = System.currentTimeMillis();
+ TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
+ new UnsafeIntSortDataFormat(page));
+ // if sort_columns is not none, sort by sort_columns
+ if (parameters.getNumberOfNoDictSortColumns() > 0) {
+ timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
+ new UnsafeRowComparator(page));
+ } else {
+ timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
+ new UnsafeRowComparatorForNormalDIms(page));
+ }
+ if (page.isSaveToDisk()) {
+ // create a new file every time
+ // create a new file and pick a temp directory randomly every time
+ String tmpDir = parameters.getTempFileLocation()[
+ new Random().nextInt(parameters.getTempFileLocation().length)];
+ File sortTempFile = new File(
+ tmpDir + File.separator + parameters.getTableName()
+ + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+ writeData(page, sortTempFile);
+ LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
+ + " and write is: " + (System.currentTimeMillis() - startTime));
+ page.freeMemory();
+ // add sort temp filename to and arrayList. When the list size reaches 20 then
+ // intermediate merging of sort temp files will be triggered
+ unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
+ } else {
+ // creating a new memory block as size is already allocated
+ // so calling lazy memory allocator
+ MemoryBlock newMemoryBlock = UnsafeSortMemoryManager.INSTANCE
+ .allocateMemoryLazy(taskId, page.getDataBlock().size());
+ // copying data from working memory manager to sortmemory manager
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(),
+ newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(),
+ page.getDataBlock().size());
+ // free unsafememory manager
+ page.freeMemory();
+ page.setNewDataBlock(newMemoryBlock);
+ // add sort temp filename to and arrayList. When the list size reaches 20 then
+ // intermediate merging of sort temp files will be triggered
+ page.getBuffer().loadToUnsafe();
+ unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(page);
+ LOGGER.info(
+ "Time taken to sort row page with size" + page.getBuffer().getActualSize() + "is: "
+ + (System.currentTimeMillis() - startTime));
+ }
+ } catch (Throwable e) {
+ try {
+ threadStatusObserver.notifyFailed(e);
+ } catch (CarbonSortKeyAndGroupByException ex) {
+ LOGGER.error(e);
+ }
+ } finally {
+ semaphore.release();
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
new file mode 100644
index 0000000..d02be9b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
+
+ /**
+ * mapping of dictionary and no dictionary of sort_columns.
+ */
+ private boolean[] noDictionarySortColumnMaping;
+
+ private Object baseObject;
+
+ public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
+ this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
+ this.baseObject = rowPage.getDataBlock().getBaseObject();
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+ if (isNoDictionary) {
+ short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + sizeA);
+ byte[] byteArr1 = new byte[aShort1];
+ sizeA += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
+ sizeA += aShort1;
+
+ short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + sizeB);
+ byte[] byteArr2 = new byte[aShort2];
+ sizeB += 2;
+ CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
+ sizeB += aShort2;
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ }
+
+ return diff;
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(UnsafeCarbonRow rowL, Object baseObjectL, UnsafeCarbonRow rowR,
+ Object baseObjectR) {
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+ if (isNoDictionary) {
+ short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + sizeA);
+ byte[] byteArr1 = new byte[aShort1];
+ sizeA += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ aShort1);
+ sizeA += aShort1;
+
+ short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + sizeB);
+ byte[] byteArr2 = new byte[aShort2];
+ sizeB += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ aShort2);
+ sizeB += aShort2;
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ }
+
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
new file mode 100644
index 0000000..483dcb2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> {
+
+ private Object baseObject;
+
+ private int numberOfSortColumns;
+
+ public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
+ this.baseObject = rowPage.getDataBlock().getBaseObject();
+ this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (int i = 0; i < numberOfSortColumns; i++) {
+ int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
new file mode 100644
index 0000000..686e855
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+/**
+ * Interface for merging temporary sort files/ inmemory data
+ */
+public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> {
+
+ boolean hasNext();
+
+ void readRow() throws CarbonSortKeyAndGroupByException;
+
+ Object[] getRow();
+
+ int numberOfRows();
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRow.java
new file mode 100644
index 0000000..2a8dc5b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRow.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+public class UnsafeCarbonRow {
+
+ public long address;
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRowForMerge.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
new file mode 100644
index 0000000..a4bb684
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+public class UnsafeCarbonRowForMerge extends UnsafeCarbonRow {
+
+ public byte index;
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
new file mode 100644
index 0000000..6b0cfa6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+
+public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeFinalMergePageHolder.class.getName());
+
+ private int counter;
+
+ private int actualSize;
+
+ private long[] mergedAddresses;
+
+ private byte[] rowPageIndexes;
+
+ private UnsafeCarbonRowPage[] rowPages;
+
+ private NewRowComparator comparator;
+
+ private Object[] currentRow;
+
+ private int columnSize;
+
+ public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
+ boolean[] noDictSortColumnMapping, int columnSize) {
+ this.actualSize = merger.getEntryCount();
+ this.mergedAddresses = merger.getMergedAddresses();
+ this.rowPageIndexes = merger.getRowPageIndexes();
+ this.rowPages = merger.getUnsafeCarbonRowPages();
+ LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
+ this.comparator = new NewRowComparator(noDictSortColumnMapping);
+ this.columnSize = columnSize;
+ }
+
+ public boolean hasNext() {
+ if (counter < actualSize) {
+ return true;
+ }
+ return false;
+ }
+
+ public void readRow() {
+ currentRow = new Object[columnSize];
+ rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow);
+ counter++;
+ }
+
+ public Object[] getRow() {
+ return currentRow;
+ }
+
+ @Override public int compareTo(SortTempChunkHolder o) {
+ return comparator.compare(currentRow, o.getRow());
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!(obj instanceof UnsafeFinalMergePageHolder)) {
+ return false;
+ }
+
+ UnsafeFinalMergePageHolder o = (UnsafeFinalMergePageHolder) obj;
+ return this == o;
+ }
+
+ @Override public int hashCode() {
+ return super.hashCode();
+ }
+
+ public int numberOfRows() {
+ return actualSize;
+ }
+
+ public void close() {
+ for (int i = 0; i < rowPages.length; i++) {
+ rowPages[i].freeMemory();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
new file mode 100644
index 0000000..6f05088
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+
+public class UnsafeInmemoryHolder implements SortTempChunkHolder {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeInmemoryHolder.class.getName());
+
+ private int counter;
+
+ private int actualSize;
+
+ private UnsafeCarbonRowPage rowPage;
+
+ private Object[] currentRow;
+
+ private long address;
+
+ private NewRowComparator comparator;
+
+ private int columnSize;
+
+ public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
+ int numberOfSortColumns) {
+ this.actualSize = rowPage.getBuffer().getActualSize();
+ this.rowPage = rowPage;
+ LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
+ this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
+ this.columnSize = columnSize;
+ }
+
+ public boolean hasNext() {
+ if (counter < actualSize) {
+ return true;
+ }
+ return false;
+ }
+
+ public void readRow() {
+ currentRow = new Object[columnSize];
+ address = rowPage.getBuffer().get(counter);
+ rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow);
+ counter++;
+ }
+
+ public Object[] getRow() {
+ return currentRow;
+ }
+
+ @Override public int compareTo(SortTempChunkHolder o) {
+ return comparator.compare(currentRow, o.getRow());
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!(obj instanceof UnsafeInmemoryHolder)) {
+ return false;
+ }
+
+ UnsafeInmemoryHolder o = (UnsafeInmemoryHolder)obj;
+
+ return this == o;
+ }
+
+ @Override public int hashCode() {
+ return super.hashCode();
+ }
+
+ public int numberOfRows() {
+ return actualSize;
+ }
+
+ public void close() {
+ rowPage.freeMemory();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
new file mode 100644
index 0000000..3b9d8d7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
+
+/**
+ * It is used for merging unsafe inmemory intermediate data
+ */
+public class UnsafeInmemoryMergeHolder implements Comparable<UnsafeInmemoryMergeHolder> {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeInmemoryMergeHolder.class.getName());
+
+ private int counter;
+
+ private int actualSize;
+
+ private UnsafeCarbonRowPage rowPage;
+
+ private UnsafeCarbonRowForMerge currentRow;
+
+ private long address;
+
+ private UnsafeRowComparator comparator;
+
+ private Object baseObject;
+
+ private byte index;
+
+ public UnsafeInmemoryMergeHolder(UnsafeCarbonRowPage rowPage, byte index) {
+ this.actualSize = rowPage.getBuffer().getActualSize();
+ this.rowPage = rowPage;
+ LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
+ this.comparator = new UnsafeRowComparator(rowPage);
+ this.baseObject = rowPage.getDataBlock().getBaseObject();
+ currentRow = new UnsafeCarbonRowForMerge();
+ this.index = index;
+ }
+
+ public boolean hasNext() {
+ if (counter < actualSize) {
+ return true;
+ }
+ return false;
+ }
+
+ public void readRow() {
+ address = rowPage.getBuffer().get(counter);
+ currentRow = new UnsafeCarbonRowForMerge();
+ currentRow.address = address + rowPage.getDataBlock().getBaseOffset();
+ currentRow.index = index;
+ counter++;
+ }
+
+ public UnsafeCarbonRowForMerge getRow() {
+ return currentRow;
+ }
+
+ @Override public int compareTo(UnsafeInmemoryMergeHolder o) {
+ return comparator.compare(currentRow, baseObject, o.getRow(), o.getBaseObject());
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!(obj instanceof UnsafeInmemoryMergeHolder)) {
+ return false;
+ }
+
+ UnsafeInmemoryMergeHolder o = (UnsafeInmemoryMergeHolder)obj;
+ return this == o;
+ }
+
+ @Override public int hashCode() {
+ return super.hashCode();
+ }
+
+ public Object getBaseObject() {
+ return baseObject;
+ }
+
+ public void close() {
+ rowPage.freeMemory();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
new file mode 100644
index 0000000..331b9db
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.holder;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
+
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeSortTempFileChunkHolder.class.getName());
+
+ /**
+ * temp file
+ */
+ private File tempFile;
+
+ /**
+ * read stream
+ */
+ private DataInputStream stream;
+
+ /**
+ * entry count
+ */
+ private int entryCount;
+
+ /**
+ * return row
+ */
+ private Object[] returnRow;
+
+ /**
+ * number of measures
+ */
+ private int measureCount;
+
+ /**
+ * number of dimensionCount
+ */
+ private int dimensionCount;
+
+ /**
+ * number of complexDimensionCount
+ */
+ private int complexDimensionCount;
+
+ /**
+ * fileBufferSize for file reader stream size
+ */
+ private int fileBufferSize;
+
+ private Object[][] currentBuffer;
+
+ private Object[][] backupBuffer;
+
+ private boolean isBackupFilled;
+
+ private boolean prefetch;
+
+ private int bufferSize;
+
+ private int bufferRowCounter;
+
+ private ExecutorService executorService;
+
+ private Future<Void> submit;
+
+ private int prefetchRecordsProceesed;
+
+ /**
+ * sortTempFileNoOFRecordsInCompression
+ */
+ private int sortTempFileNoOFRecordsInCompression;
+
+ /**
+ * isSortTempFileCompressionEnabled
+ */
+ private boolean isSortTempFileCompressionEnabled;
+
+ /**
+ * totalRecordFetch
+ */
+ private int totalRecordFetch;
+
+ private int noDictionaryCount;
+
+ private DataType[] measureDataType;
+
+ private int numberOfObjectRead;
+ /**
+ * to store whether dimension is of dictionary type or not
+ */
+ private boolean[] isNoDictionaryDimensionColumn;
+
+ private int nullSetWordsLength;
+
+ private Comparator<Object[]> comparator;
+
+ /**
+ * Constructor to initialize
+ */
+ public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
+ // set temp file
+ this.tempFile = tempFile;
+
+ // set measure and dimension count
+ this.measureCount = parameters.getMeasureColCount();
+ this.dimensionCount = parameters.getDimColCount();
+ this.complexDimensionCount = parameters.getComplexDimColCount();
+
+ this.noDictionaryCount = parameters.getNoDictionaryCount();
+ // set mdkey length
+ this.fileBufferSize = parameters.getFileBufferSize();
+ this.executorService = Executors.newFixedThreadPool(1);
+ this.measureDataType = parameters.getMeasureDataType();
+ this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+ this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
+ comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
+ initialize();
+ }
+
+ /**
+ * This method will be used to initialize
+ *
+ * @throws CarbonSortKeyAndGroupByException problem while initializing
+ */
+ public void initialize() {
+ prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
+ CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
+ bufferSize = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
+ this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+ CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
+ if (this.isSortTempFileCompressionEnabled) {
+ LOGGER.info("Compression was used while writing the sortTempFile");
+ }
+
+ try {
+ this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+ CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+ if (this.sortTempFileNoOFRecordsInCompression < 1) {
+ LOGGER.error("Invalid value for: "
+ + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
+ + " be used");
+
+ this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+ CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.error(
+ "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ + ", only Positive Integer value is allowed.Default value will be used");
+ this.sortTempFileNoOFRecordsInCompression = Integer
+ .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+ }
+
+ initialise();
+ }
+
+ private void initialise() {
+ try {
+ if (isSortTempFileCompressionEnabled) {
+ this.bufferSize = sortTempFileNoOFRecordsInCompression;
+ }
+ stream = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
+ this.entryCount = stream.readInt();
+ LOGGER.audit("Processing unsafe mode file rows with size : " + entryCount);
+ if (prefetch) {
+ new DataFetcher(false).call();
+ totalRecordFetch += currentBuffer.length;
+ if (totalRecordFetch < this.entryCount) {
+ submit = executorService.submit(new DataFetcher(true));
+ }
+ } else {
+ if (isSortTempFileCompressionEnabled) {
+ new DataFetcher(false).call();
+ }
+ }
+
+ } catch (FileNotFoundException e) {
+ LOGGER.error(e);
+ throw new RuntimeException(tempFile + " No Found", e);
+ } catch (IOException e) {
+ LOGGER.error(e);
+ throw new RuntimeException(tempFile + " No Found", e);
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new RuntimeException(tempFile + " Problem while reading", e);
+ }
+ }
+
+ /**
+ * This method will be used to read new row from file
+ *
+ * @throws CarbonSortKeyAndGroupByException problem while reading
+ */
+ public void readRow() throws CarbonSortKeyAndGroupByException {
+ if (prefetch) {
+ fillDataForPrefetch();
+ } else if (isSortTempFileCompressionEnabled) {
+ if (bufferRowCounter >= bufferSize) {
+ try {
+ new DataFetcher(false).call();
+ bufferRowCounter = 0;
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
+ }
+
+ }
+ prefetchRecordsProceesed++;
+ returnRow = currentBuffer[bufferRowCounter++];
+ } else {
+ this.returnRow = getRowFromStream();
+ }
+ }
+
+ private void fillDataForPrefetch() {
+ if (bufferRowCounter >= bufferSize) {
+ if (isBackupFilled) {
+ bufferRowCounter = 0;
+ currentBuffer = backupBuffer;
+ totalRecordFetch += currentBuffer.length;
+ isBackupFilled = false;
+ if (totalRecordFetch < this.entryCount) {
+ submit = executorService.submit(new DataFetcher(true));
+ }
+ } else {
+ try {
+ submit.get();
+ } catch (Exception e) {
+ LOGGER.error(e);
+ }
+ bufferRowCounter = 0;
+ currentBuffer = backupBuffer;
+ isBackupFilled = false;
+ totalRecordFetch += currentBuffer.length;
+ if (totalRecordFetch < this.entryCount) {
+ submit = executorService.submit(new DataFetcher(true));
+ }
+ }
+ }
+ prefetchRecordsProceesed++;
+ returnRow = currentBuffer[bufferRowCounter++];
+ }
+
+ /**
+ * @return
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
+ Object[] row = new Object[dimensionCount + measureCount];
+ try {
+ int dimCount = 0;
+ for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+ if (isNoDictionaryDimensionColumn[dimCount]) {
+ short aShort = stream.readShort();
+ byte[] col = new byte[aShort];
+ stream.readFully(col);
+ row[dimCount] = col;
+ } else {
+ int anInt = stream.readInt();
+ row[dimCount] = anInt;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionCount; dimCount++) {
+ short aShort = stream.readShort();
+ byte[] col = new byte[aShort];
+ stream.readFully(col);
+ row[dimCount] = col;
+ }
+
+ long[] words = new long[nullSetWordsLength];
+ for (int i = 0; i < words.length; i++) {
+ words[i] = stream.readLong();
+ }
+
+ for (int mesCount = 0; mesCount < measureCount; mesCount++) {
+ if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
+ switch (measureDataType[mesCount]) {
+ case SHORT:
+ row[dimensionCount + mesCount] = stream.readShort();
+ break;
+ case INT:
+ row[dimensionCount + mesCount] = stream.readInt();
+ break;
+ case LONG:
+ row[dimensionCount + mesCount] = stream.readLong();
+ break;
+ case DOUBLE:
+ row[dimensionCount + mesCount] = stream.readDouble();
+ break;
+ case DECIMAL:
+ short aShort = stream.readShort();
+ byte[] bigDecimalInBytes = new byte[aShort];
+ stream.readFully(bigDecimalInBytes);
+ row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported data type:" +
+ measureDataType[mesCount]);
+ }
+ }
+ }
+ return row;
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+ }
+
+ /**
+ * below method will be used to get the row
+ *
+ * @return row
+ */
+ public Object[] getRow() {
+ return this.returnRow;
+ }
+
+ /**
+ * below method will be used to check whether any more records are present
+ * in file or not
+ *
+ * @return more row present in file
+ */
+ public boolean hasNext() {
+ if (prefetch || isSortTempFileCompressionEnabled) {
+ return this.prefetchRecordsProceesed < this.entryCount;
+ }
+ return this.numberOfObjectRead < this.entryCount;
+ }
+
+ /**
+ * Below method will be used to close streams
+ */
+ public void close() {
+ CarbonUtil.closeStreams(stream);
+ executorService.shutdown();
+ }
+
+ /**
+ * This method will number of entries
+ *
+ * @return entryCount
+ */
+ public int numberOfRows() {
+ return entryCount;
+ }
+
+ @Override public int compareTo(SortTempChunkHolder other) {
+ return comparator.compare(returnRow, other.getRow());
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!(obj instanceof UnsafeSortTempFileChunkHolder)) {
+ return false;
+ }
+ UnsafeSortTempFileChunkHolder o = (UnsafeSortTempFileChunkHolder) obj;
+
+ return this == o;
+ }
+
+ @Override public int hashCode() {
+ int hash = 0;
+ hash += 31 * measureCount;
+ hash += 31 * dimensionCount;
+ hash += 31 * complexDimensionCount;
+ hash += 31 * noDictionaryCount;
+ hash += tempFile.hashCode();
+ return hash;
+ }
+
+ private final class DataFetcher implements Callable<Void> {
+ private boolean isBackUpFilling;
+
+ private int numberOfRecords;
+
+ private DataFetcher(boolean backUp) {
+ isBackUpFilling = backUp;
+ calculateNumberOfRecordsToBeFetched();
+ }
+
+ private void calculateNumberOfRecordsToBeFetched() {
+ int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
+ numberOfRecords =
+ bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
+ }
+
+ @Override public Void call() throws Exception {
+ try {
+ if (isBackUpFilling) {
+ backupBuffer = prefetchRecordsFromFile(numberOfRecords);
+ isBackupFilled = true;
+ } else {
+ currentBuffer = prefetchRecordsFromFile(numberOfRecords);
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ }
+ return null;
+ }
+
+ }
+
+ /**
+ * This method will read the records from sort temp file and keep it in a buffer
+ *
+ * @param numberOfRecords
+ * @return
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private Object[][] prefetchRecordsFromFile(int numberOfRecords)
+ throws CarbonSortKeyAndGroupByException {
+ Object[][] records = new Object[numberOfRecords][];
+ for (int i = 0; i < numberOfRecords; i++) {
+ records[i] = getRowFromStream();
+ }
+ return records;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
new file mode 100644
index 0000000..3955864
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.merger;
+
+import java.util.AbstractQueue;
+import java.util.PriorityQueue;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRowForMerge;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeInmemoryMergeHolder;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class UnsafeInMemoryIntermediateDataMerger implements Runnable {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeInMemoryIntermediateDataMerger.class.getName());
+
+ /**
+ * recordHolderHeap
+ */
+ private AbstractQueue<UnsafeInmemoryMergeHolder> recordHolderHeap;
+
+ /**
+ * fileCounter
+ */
+ private int holderCounter;
+
+ /**
+ * entryCount
+ */
+ private int entryCount;
+
+ private UnsafeCarbonRowPage[] unsafeCarbonRowPages;
+
+ private long[] mergedAddresses;
+
+ private byte[] rowPageIndexes;
+
+ /**
+ * IntermediateFileMerger Constructor
+ */
+ public UnsafeInMemoryIntermediateDataMerger(UnsafeCarbonRowPage[] unsafeCarbonRowPages,
+ int totalSize) {
+ this.holderCounter = unsafeCarbonRowPages.length;
+ this.unsafeCarbonRowPages = unsafeCarbonRowPages;
+ this.mergedAddresses = new long[totalSize];
+ this.rowPageIndexes = new byte[totalSize];
+ this.entryCount = 0;
+ }
+
+ @Override
+ public void run() {
+ long intermediateMergeStartTime = System.currentTimeMillis();
+ int holderCounterConst = holderCounter;
+ try {
+ startSorting();
+ while (hasNext()) {
+ writeDataToMemory(next());
+ }
+ double intermediateMergeCostTime =
+ (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
+ LOGGER.info("============================== Intermediate Merge of " + holderCounterConst
+ + " in-memory sort Cost Time: " + intermediateMergeCostTime + "(s)");
+ } catch (Exception e) {
+ LOGGER.error(e, "Problem while intermediate merging");
+ }
+ }
+
+ /**
+ * This method will be used to get the sorted record from file
+ *
+ * @return sorted record sorted record
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private UnsafeCarbonRowForMerge getSortedRecordFromMemory()
+ throws CarbonSortKeyAndGroupByException {
+ UnsafeCarbonRowForMerge row = null;
+
+ // poll the top object from heap
+ // heap maintains binary tree which is based on heap condition that will
+ // be based on comparator we are passing the heap
+ // when will call poll it will always delete root of the tree and then
+ // it does trickel down operation complexity is log(n)
+ UnsafeInmemoryMergeHolder poll = this.recordHolderHeap.poll();
+
+ // get the row from chunk
+ row = poll.getRow();
+
+ // check if there no entry present
+ if (!poll.hasNext()) {
+ // change the file counter
+ --this.holderCounter;
+
+ // reaturn row
+ return row;
+ }
+
+ // read new row
+ poll.readRow();
+
+ // add to heap
+ this.recordHolderHeap.add(poll);
+
+ // return row
+ return row;
+ }
+
+ /**
+ * Below method will be used to start storing process This method will get
+ * all the temp files present in sort temp folder then it will create the
+ * record holder heap and then it will read first record from each file and
+ * initialize the heap
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private void startSorting() throws CarbonSortKeyAndGroupByException {
+ LOGGER.info("Number of row pages in intermediate merger: " + this.holderCounter);
+
+ // create record holder heap
+ createRecordHolderQueue(unsafeCarbonRowPages);
+
+ // iterate over file list and create chunk holder and add to heap
+ LOGGER.info("Started adding first record from row page");
+
+ UnsafeInmemoryMergeHolder unsafePageHolder = null;
+ byte index = 0;
+ for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
+ // create chunk holder
+ unsafePageHolder = new UnsafeInmemoryMergeHolder(unsafeCarbonRowPage, index++);
+
+ // initialize
+ unsafePageHolder.readRow();
+
+ // add to heap
+ this.recordHolderHeap.add(unsafePageHolder);
+ }
+
+ LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+ }
+
+ /**
+ * This method will be used to create the heap which will be used to hold
+ * the chunk of data
+ */
+ private void createRecordHolderQueue(UnsafeCarbonRowPage[] pages) {
+ // creating record holder heap
+ this.recordHolderHeap = new PriorityQueue<UnsafeInmemoryMergeHolder>(pages.length);
+ }
+
+ /**
+ * This method will be used to get the sorted row
+ *
+ * @return sorted row
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private UnsafeCarbonRowForMerge next() throws CarbonSortKeyAndGroupByException {
+ return getSortedRecordFromMemory();
+ }
+
+ /**
+ * This method will be used to check whether any more element is present or
+ * not
+ *
+ * @return more element is present
+ */
+ private boolean hasNext() {
+ return this.holderCounter > 0;
+ }
+
+ /**
+ * Below method will be used to write data to file
+ */
+ private void writeDataToMemory(UnsafeCarbonRowForMerge row) {
+ mergedAddresses[entryCount] = row.address;
+ rowPageIndexes[entryCount] = row.index;
+ entryCount++;
+ }
+
+ public int getEntryCount() {
+ return entryCount;
+ }
+
+ public UnsafeCarbonRowPage[] getUnsafeCarbonRowPages() {
+ return unsafeCarbonRowPages;
+ }
+
+ public long[] getMergedAddresses() {
+ return mergedAddresses;
+ }
+
+ public byte[] getRowPageIndexes() {
+ return rowPageIndexes;
+ }
+}