You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:21 UTC
[05/20] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
new file mode 100644
index 0000000..0883ae1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+
+/**
+ * This class is used as comparator for comparing dims which are non high cardinality dims.
+ * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
+ */
+public class RowComparatorForNormalDims implements Comparator<Object[]> {
+ /**
+ * dimension count
+ */
+ private int numberOfSortColumns;
+
+ /**
+ * RowComparatorForNormalDims Constructor
+ *
+ * @param numberOfSortColumns
+ */
+ public RowComparatorForNormalDims(int numberOfSortColumns) {
+ this.numberOfSortColumns = numberOfSortColumns;
+ }
+
+ /**
+ * Below method will be used to compare two surrogate keys
+ *
+ * @see Comparator#compare(Object, Object)
+ */
+ public int compare(Object[] rowA, Object[] rowB) {
+ int diff = 0;
+
+ for (int i = 0; i < numberOfSortColumns; i++) {
+
+ int dimFieldA = NonDictionaryUtil.getDimension(i, rowA);
+ int dimFieldB = NonDictionaryUtil.getDimension(i, rowB);
+
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
new file mode 100644
index 0000000..6d6ff94
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(SingleThreadFinalSortFilesMerger.class.getName());
+
+ /**
+ * lockObject
+ */
+ private static final Object LOCKOBJECT = new Object();
+
+ /**
+ * fileCounter
+ */
+ private int fileCounter;
+
+ /**
+ * fileBufferSize
+ */
+ private int fileBufferSize;
+
+ /**
+ * recordHolderHeap
+ */
+ private AbstractQueue<SortTempFileChunkHolder> recordHolderHeapLocal;
+
+ /**
+ * tableName
+ */
+ private String tableName;
+
+ /**
+ * measureCount
+ */
+ private int measureCount;
+
+ /**
+ * dimensionCount
+ */
+ private int dimensionCount;
+
+ /**
+ * measure count
+ */
+ private int noDictionaryCount;
+
+ /**
+ * complexDimensionCount
+ */
+ private int complexDimensionCount;
+
+ /**
+ * tempFileLocation
+ */
+ private String[] tempFileLocation;
+
+ private DataType[] measureDataType;
+
+ /**
+ * below code is to check whether dimension
+ * is of no dictionary type or not
+ */
+ private boolean[] isNoDictionaryColumn;
+
+ private boolean[] isNoDictionarySortColumn;
+
+ public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String tableName,
+ int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
+ DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
+ this.tempFileLocation = tempFileLocation;
+ this.tableName = tableName;
+ this.dimensionCount = dimensionCount;
+ this.complexDimensionCount = complexDimensionCount;
+ this.measureCount = measureCount;
+ this.measureDataType = type;
+ this.noDictionaryCount = noDictionaryCount;
+ this.isNoDictionaryColumn = isNoDictionaryColumn;
+ this.isNoDictionarySortColumn = isNoDictionarySortColumn;
+ }
+
+ /**
+ * This method will be used to merger the merged files
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ public void startFinalMerge() throws CarbonDataWriterException {
+ List<File> filesToMerge = getFilesToMergeSort();
+ if (filesToMerge.size() == 0)
+ {
+ LOGGER.info("No file to merge in final merge stage");
+ return;
+ }
+
+ startSorting(filesToMerge);
+ }
+
+ private List<File> getFilesToMergeSort() {
+ FileFilter fileFilter = new FileFilter() {
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith(tableName);
+ }
+ };
+
+ // get all the merged files
+ List<File> files = new ArrayList<File>(tempFileLocation.length);
+ for (String tempLoc : tempFileLocation)
+ {
+ File[] subFiles = new File(tempLoc).listFiles(fileFilter);
+ if (null != subFiles && subFiles.length > 0)
+ {
+ files.addAll(Arrays.asList(subFiles));
+ }
+ }
+
+ return files;
+ }
+
+ /**
+ * Below method will be used to start storing process This method will get
+ * all the temp files present in sort temp folder then it will create the
+ * record holder heap and then it will read first record from each file and
+ * initialize the heap
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private void startSorting(List<File> files) throws CarbonDataWriterException {
+ this.fileCounter = files.size();
+ if (fileCounter == 0) {
+ LOGGER.info("No files to merge sort");
+ return;
+ }
+ this.fileBufferSize = CarbonDataProcessorUtil
+ .getFileBufferSize(this.fileCounter, CarbonProperties.getInstance(),
+ CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+ LOGGER.info("Number of temp file: " + this.fileCounter);
+
+ LOGGER.info("File Buffer Size: " + this.fileBufferSize);
+
+ // create record holder heap
+ createRecordHolderQueue();
+
+ // iterate over file list and create chunk holder and add to heap
+ LOGGER.info("Started adding first record from each file");
+ int maxThreadForSorting = 0;
+ try {
+ maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
+ CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE));
+ } catch (NumberFormatException e) {
+ maxThreadForSorting =
+ Integer.parseInt(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE);
+ }
+ ExecutorService service = Executors.newFixedThreadPool(maxThreadForSorting);
+
+ for (final File tempFile : files) {
+
+ Runnable runnable = new Runnable() {
+ @Override public void run() {
+
+ // create chunk holder
+ SortTempFileChunkHolder sortTempFileChunkHolder =
+ new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
+ measureCount, fileBufferSize, noDictionaryCount, measureDataType,
+ isNoDictionaryColumn, isNoDictionarySortColumn);
+ try {
+ // initialize
+ sortTempFileChunkHolder.initialize();
+ sortTempFileChunkHolder.readRow();
+ } catch (CarbonSortKeyAndGroupByException ex) {
+ LOGGER.error(ex);
+ }
+
+ synchronized (LOCKOBJECT) {
+ recordHolderHeapLocal.add(sortTempFileChunkHolder);
+ }
+ }
+ };
+ service.execute(runnable);
+ }
+ service.shutdown();
+
+ try {
+ service.awaitTermination(2, TimeUnit.HOURS);
+ } catch (Exception e) {
+ throw new CarbonDataWriterException(e.getMessage(), e);
+ }
+
+ LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
+ }
+
+ /**
+ * This method will be used to create the heap which will be used to hold
+ * the chunk of data
+ */
+ private void createRecordHolderQueue() {
+ // creating record holder heap
+ this.recordHolderHeapLocal = new PriorityQueue<SortTempFileChunkHolder>(fileCounter);
+ }
+
+ /**
+ * This method will be used to get the sorted row
+ *
+ * @return sorted row
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ public Object[] next() {
+ return getSortedRecordFromFile();
+ }
+
+ /**
+ * This method will be used to get the sorted record from file
+ *
+ * @return sorted record sorted record
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
+ Object[] row = null;
+
+ // poll the top object from heap
+ // heap maintains binary tree which is based on heap condition that will
+ // be based on comparator we are passing the heap
+ // when will call poll it will always delete root of the tree and then
+ // it does trickel down operation complexity is log(n)
+ SortTempFileChunkHolder poll = this.recordHolderHeapLocal.poll();
+
+ // get the row from chunk
+ row = poll.getRow();
+
+ // check if there no entry present
+ if (!poll.hasNext()) {
+ // if chunk is empty then close the stream
+ poll.closeStream();
+
+ // change the file counter
+ --this.fileCounter;
+
+ // reaturn row
+ return row;
+ }
+
+ // read new row
+ try {
+ poll.readRow();
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataWriterException(e.getMessage(), e);
+ }
+
+ // add to heap
+ this.recordHolderHeapLocal.add(poll);
+
+ // return row
+ return row;
+ }
+
+ /**
+ * This method will be used to check whether any more element is present or
+ * not
+ *
+ * @return more element is present
+ */
+ public boolean hasNext() {
+ return this.fileCounter > 0;
+ }
+
+ public void clear() {
+ if (null != recordHolderHeapLocal) {
+ recordHolderHeapLocal = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
new file mode 100644
index 0000000..fc744a6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class SortDataRows {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(SortDataRows.class.getName());
+ /**
+ * entryCount
+ */
+ private int entryCount;
+ /**
+ * record holder array
+ */
+ private Object[][] recordHolderList;
+ /**
+ * threadStatusObserver
+ */
+ private ThreadStatusObserver threadStatusObserver;
+ /**
+ * executor service for data sort holder
+ */
+ private ExecutorService dataSorterAndWriterExecutorService;
+ /**
+ * semaphore which will used for managing sorted data object arrays
+ */
+ private Semaphore semaphore;
+
+ private SortParameters parameters;
+
+ private int sortBufferSize;
+
+ private SortIntermediateFileMerger intermediateFileMerger;
+
+ private final Object addRowsLock = new Object();
+
+ public SortDataRows(SortParameters parameters,
+ SortIntermediateFileMerger intermediateFileMerger) {
+ this.parameters = parameters;
+
+ this.intermediateFileMerger = intermediateFileMerger;
+
+ int batchSize = CarbonProperties.getInstance().getBatchSize();
+
+ this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize);
+ // observer of writing file in thread
+ this.threadStatusObserver = new ThreadStatusObserver();
+ }
+
+ /**
+ * This method will be used to initialize
+ */
+ public void initialize() throws CarbonSortKeyAndGroupByException {
+
+ // create holder list which will hold incoming rows
+ // size of list will be sort buffer size + 1 to avoid creation of new
+ // array in list array
+ this.recordHolderList = new Object[sortBufferSize][];
+ // Delete if any older file exists in sort temp folder
+ deleteSortLocationIfExists();
+
+ // create new sort temp directory
+ CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
+ this.dataSorterAndWriterExecutorService =
+ Executors.newFixedThreadPool(parameters.getNumberOfCores());
+ semaphore = new Semaphore(parameters.getNumberOfCores());
+ }
+
+ /**
+ * This method will be used to add new row
+ *
+ * @param row new row
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
+ // if record holder list size is equal to sort buffer size then it will
+ // sort the list and then write current list data to file
+ int currentSize = entryCount;
+
+ if (sortBufferSize == currentSize) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("************ Writing to temp file ********** ");
+ }
+ intermediateFileMerger.startMergingIfPossible();
+ Object[][] recordHolderListLocal = recordHolderList;
+ try {
+ semaphore.acquire();
+ dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
+ } catch (InterruptedException e) {
+ LOGGER.error(
+ "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e.getMessage());
+ }
+ // create the new holder Array
+ this.recordHolderList = new Object[this.sortBufferSize][];
+ this.entryCount = 0;
+ }
+ recordHolderList[entryCount++] = row;
+ }
+
+ /**
+ * This method will be used to add new row
+ *
+ * @param rowBatch new rowBatch
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
+ // if record holder list size is equal to sort buffer size then it will
+ // sort the list and then write current list data to file
+ synchronized (addRowsLock) {
+ int sizeLeft = 0;
+ if (entryCount + size >= sortBufferSize) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("************ Writing to temp file ********** ");
+ }
+ intermediateFileMerger.startMergingIfPossible();
+ Object[][] recordHolderListLocal = recordHolderList;
+ sizeLeft = sortBufferSize - entryCount ;
+ if (sizeLeft > 0) {
+ System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
+ }
+ try {
+ semaphore.acquire();
+ dataSorterAndWriterExecutorService
+ .execute(new DataSorterAndWriter(recordHolderListLocal));
+ } catch (Exception e) {
+ LOGGER.error(
+ "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+ // create the new holder Array
+ this.recordHolderList = new Object[this.sortBufferSize][];
+ this.entryCount = 0;
+ size = size - sizeLeft;
+ if (size == 0) {
+ return;
+ }
+ }
+ System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
+ entryCount += size;
+ }
+ }
+
+ /**
+ * Below method will be used to start storing process This method will get
+ * all the temp files present in sort temp folder then it will create the
+ * record holder heap and then it will read first record from each file and
+ * initialize the heap
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ public void startSorting() throws CarbonSortKeyAndGroupByException {
+ LOGGER.info("File based sorting will be used");
+ if (this.entryCount > 0) {
+ Object[][] toSort;
+ toSort = new Object[entryCount][];
+ System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
+ if (parameters.getNumberOfNoDictSortColumns() > 0) {
+ Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn()));
+ } else {
+ Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
+ }
+ recordHolderList = toSort;
+
+ // create new file and choose folder randomly
+ String[] tmpLocation = parameters.getTempFileLocation();
+ String locationChosen = tmpLocation[new Random().nextInt(tmpLocation.length)];
+ File file = new File(
+ locationChosen + File.separator + parameters.getTableName() +
+ System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+ writeDataTofile(recordHolderList, this.entryCount, file);
+
+ }
+
+ startFileBasedMerge();
+ this.recordHolderList = null;
+ }
+
+ /**
+ * Below method will be used to write data to file
+ *
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ private void writeDataTofile(Object[][] recordHolderList, int entryCountLocal, File file)
+ throws CarbonSortKeyAndGroupByException {
+ // stream
+ if (parameters.isSortFileCompressionEnabled() || parameters.isPrefetch()) {
+ writeSortTempFile(recordHolderList, entryCountLocal, file);
+ return;
+ }
+ writeData(recordHolderList, entryCountLocal, file);
+ }
+
+ private void writeSortTempFile(Object[][] recordHolderList, int entryCountLocal, File file)
+ throws CarbonSortKeyAndGroupByException {
+ TempSortFileWriter writer = null;
+
+ try {
+ writer = getWriter();
+ writer.initiaize(file, entryCountLocal);
+ writer.writeSortTempFile(recordHolderList);
+ } catch (CarbonSortKeyAndGroupByException e) {
+ LOGGER.error(e, "Problem while writing the sort temp file");
+ throw e;
+ } finally {
+ if (writer != null) {
+ writer.finish();
+ }
+ }
+ }
+
+ private void writeData(Object[][] recordHolderList, int entryCountLocal, File file)
+ throws CarbonSortKeyAndGroupByException {
+ DataOutputStream stream = null;
+ try {
+ // open stream
+ stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
+ parameters.getFileWriteBufferSize()));
+
+ // write number of entries to the file
+ stream.writeInt(entryCountLocal);
+ int complexDimColCount = parameters.getComplexDimColCount();
+ int dimColCount = parameters.getDimColCount() + complexDimColCount;
+ DataType[] type = parameters.getMeasureDataType();
+ boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
+ Object[] row = null;
+ for (int i = 0; i < entryCountLocal; i++) {
+ // get row from record holder list
+ row = recordHolderList[i];
+ int dimCount = 0;
+ // write dictionary and non dictionary dimensions here.
+ for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) {
+ if (noDictionaryDimnesionMapping[dimCount]) {
+ byte[] col = (byte[]) row[dimCount];
+ stream.writeShort(col.length);
+ stream.write(col);
+ } else {
+ stream.writeInt((int)row[dimCount]);
+ }
+ }
+ // write complex dimensions here.
+ for (; dimCount < dimColCount; dimCount++) {
+ byte[] value = (byte[])row[dimCount];
+ stream.writeShort(value.length);
+ stream.write(value);
+ }
+ // as measures are stored in separate array.
+ for (int mesCount = 0;
+ mesCount < parameters.getMeasureColCount(); mesCount++) {
+ Object value = row[mesCount + dimColCount];
+ if (null != value) {
+ stream.write((byte) 1);
+ switch (type[mesCount]) {
+ case SHORT:
+ stream.writeShort((Short) value);
+ break;
+ case INT:
+ stream.writeInt((Integer) value);
+ break;
+ case LONG:
+ stream.writeLong((Long) value);
+ break;
+ case DOUBLE:
+ stream.writeDouble((Double) value);
+ break;
+ case DECIMAL:
+ BigDecimal val = (BigDecimal) value;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+ stream.writeInt(bigDecimalInBytes.length);
+ stream.write(bigDecimalInBytes);
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
+ }
+ } else {
+ stream.write((byte) 0);
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+ } finally {
+ // close streams
+ CarbonUtil.closeStreams(stream);
+ }
+ }
+
+ private TempSortFileWriter getWriter() {
+ TempSortFileWriter chunkWriter = null;
+ TempSortFileWriter writer = TempSortFileWriterFactory.getInstance()
+ .getTempSortFileWriter(parameters.isSortFileCompressionEnabled(),
+ parameters.getDimColCount(), parameters.getComplexDimColCount(),
+ parameters.getMeasureColCount(), parameters.getNoDictionaryCount(),
+ parameters.getFileWriteBufferSize());
+
+ if (parameters.isPrefetch() && !parameters.isSortFileCompressionEnabled()) {
+ chunkWriter = new SortTempFileChunkWriter(writer, parameters.getBufferSize());
+ } else {
+ chunkWriter =
+ new SortTempFileChunkWriter(writer, parameters.getSortTempFileNoOFRecordsInCompression());
+ }
+
+ return chunkWriter;
+ }
+
+ /**
+ * This method will be used to delete sort temp location is it is exites
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
+ CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
+ }
+
+ /**
+ * Below method will be used to start file based merge
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private void startFileBasedMerge() throws CarbonSortKeyAndGroupByException {
+ try {
+ dataSorterAndWriterExecutorService.shutdown();
+ dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
+ } catch (InterruptedException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
+ }
+ }
+
+ /**
+ * Observer class for thread execution
+ * In case of any failure we need stop all the running thread
+ */
+ private class ThreadStatusObserver {
+ /**
+ * Below method will be called if any thread fails during execution
+ *
+ * @param exception
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
+ dataSorterAndWriterExecutorService.shutdownNow();
+ intermediateFileMerger.close();
+ parameters.getObserver().setFailed(true);
+ LOGGER.error(exception);
+ throw new CarbonSortKeyAndGroupByException(exception);
+ }
+ }
+
+ /**
+ * This class is responsible for sorting and writing the object
+ * array which holds the records equal to given array size
+ */
+ private class DataSorterAndWriter implements Runnable {
+ private Object[][] recordHolderArray;
+
+ public DataSorterAndWriter(Object[][] recordHolderArray) {
+ this.recordHolderArray = recordHolderArray;
+ }
+
+ @Override
+ public void run() {
+ try {
+ long startTime = System.currentTimeMillis();
+ if (parameters.getNumberOfNoDictSortColumns() > 0) {
+ Arrays.sort(recordHolderArray,
+ new NewRowComparator(parameters.getNoDictionarySortColumn()));
+ } else {
+ Arrays.sort(recordHolderArray,
+ new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
+ }
+
+ // create a new file and choose folder randomly every time
+ String[] tmpFileLocation = parameters.getTempFileLocation();
+ String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)];
+ File sortTempFile = new File(
+ locationChosen + File.separator + parameters.getTableName() + System
+ .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+ writeDataTofile(recordHolderArray, recordHolderArray.length, sortTempFile);
+ // add sort temp filename to and arrayList. When the list size reaches 20 then
+ // intermediate merging of sort temp files will be triggered
+ intermediateFileMerger.addFileToMerge(sortTempFile);
+ LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + (
+ System.currentTimeMillis() - startTime));
+ } catch (Throwable e) {
+ try {
+ threadStatusObserver.notifyFailed(e);
+ } catch (CarbonSortKeyAndGroupByException ex) {
+ LOGGER.error(ex);
+ }
+ } finally {
+ semaphore.release();
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
new file mode 100644
index 0000000..d234ce2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+/**
+ * It does mergesort intermediate files to big file.
+ */
+public class SortIntermediateFileMerger {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(SortIntermediateFileMerger.class.getName());
+
+ /**
+ * executorService
+ */
+ private ExecutorService executorService;
+ /**
+ * procFiles
+ */
+ private List<File> procFiles;
+
+ private SortParameters parameters;
+
+ private final Object lockObject = new Object();
+
+ public SortIntermediateFileMerger(SortParameters parameters) {
+ this.parameters = parameters;
+ // processed file list
+ this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
+ }
+
+ public void addFileToMerge(File sortTempFile) {
+ // add sort temp filename to and arrayList. When the list size reaches 20 then
+ // intermediate merging of sort temp files will be triggered
+ synchronized (lockObject) {
+ procFiles.add(sortTempFile);
+ }
+ }
+
+ public void startMergingIfPossible() {
+ File[] fileList;
+ if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
+ synchronized (lockObject) {
+ fileList = procFiles.toArray(new File[procFiles.size()]);
+ this.procFiles = new ArrayList<File>();
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
+ }
+ startIntermediateMerging(fileList);
+ }
+ }
+
+ /**
+ * Below method will be used to start the intermediate file merging
+ *
+ * @param intermediateFiles
+ */
+ private void startIntermediateMerging(File[] intermediateFiles) {
+ int index = new Random().nextInt(parameters.getTempFileLocation().length);
+ String chosenTempDir = parameters.getTempFileLocation()[index];
+ File file = new File(
+ chosenTempDir + File.separator + parameters.getTableName() + System
+ .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
+ IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
+ executorService.execute(merger);
+ }
+
+ public void finish() throws CarbonSortKeyAndGroupByException {
+ try {
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ } catch (InterruptedException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
+ }
+ procFiles.clear();
+ procFiles = null;
+ }
+
+ public void close() {
+ if (executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortObserver.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortObserver.java
new file mode 100644
index 0000000..681e60b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortObserver.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.Serializable;
+
+public class SortObserver implements Serializable {
+ /**
+ * is failed
+ */
+ private boolean isFailed;
+
+ /**
+ * @return the isFailed
+ */
+ public boolean isFailed() {
+ return isFailed;
+ }
+
+ /**
+ * @param isFailed the isFailed to set
+ */
+ public void setFailed(boolean isFailed) {
+ this.isFailed = isFailed;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
new file mode 100644
index 0000000..39e1049
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class SortParameters implements Serializable {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(SortParameters.class.getName());
+ /**
+ * tempFileLocation
+ */
+ private String[] tempFileLocation;
+ /**
+ * sortBufferSize
+ */
+ private int sortBufferSize;
+ /**
+ * measure count
+ */
+ private int measureColCount;
+ /**
+ * measure count
+ */
+ private int dimColCount;
+ /**
+ * measure count
+ */
+ private int complexDimColCount;
+ /**
+ * fileBufferSize
+ */
+ private int fileBufferSize;
+ /**
+ * numberOfIntermediateFileToBeMerged
+ */
+ private int numberOfIntermediateFileToBeMerged;
+ /**
+ * fileWriteBufferSize
+ */
+ private int fileWriteBufferSize;
+ /**
+ * observer
+ */
+ private SortObserver observer;
+ /**
+ * sortTempFileNoOFRecordsInCompression
+ */
+ private int sortTempFileNoOFRecordsInCompression;
+ /**
+ * isSortTempFileCompressionEnabled
+ */
+ private boolean isSortFileCompressionEnabled;
+ /**
+ * prefetch
+ */
+ private boolean prefetch;
+ /**
+ * bufferSize
+ */
+ private int bufferSize;
+
+ private String databaseName;
+
+ private String tableName;
+
+ private DataType[] measureDataType;
+
+ /**
+ * To know how many columns are of high cardinality.
+ */
+ private int noDictionaryCount;
+ /**
+ * partitionID
+ */
+ private String partitionID;
+ /**
+ * Id of the load folder
+ */
+ private String segmentId;
+ /**
+ * task id, each spark task has a unique id
+ */
+ private String taskNo;
+
+ /**
+ * This will tell whether dimension is dictionary or not.
+ */
+ private boolean[] noDictionaryDimnesionColumn;
+
+ private boolean[] noDictionarySortColumn;
+
+ private int numberOfSortColumns;
+
+ private int numberOfNoDictSortColumns;
+
+ private int numberOfCores;
+
+ private int batchSortSizeinMb;
+
+ public SortParameters getCopy() {
+ SortParameters parameters = new SortParameters();
+ parameters.tempFileLocation = tempFileLocation;
+ parameters.sortBufferSize = sortBufferSize;
+ parameters.measureColCount = measureColCount;
+ parameters.dimColCount = dimColCount;
+ parameters.complexDimColCount = complexDimColCount;
+ parameters.fileBufferSize = fileBufferSize;
+ parameters.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
+ parameters.fileWriteBufferSize = fileWriteBufferSize;
+ parameters.observer = observer;
+ parameters.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
+ parameters.isSortFileCompressionEnabled = isSortFileCompressionEnabled;
+ parameters.prefetch = prefetch;
+ parameters.bufferSize = bufferSize;
+ parameters.databaseName = databaseName;
+ parameters.tableName = tableName;
+ parameters.measureDataType = measureDataType;
+ parameters.noDictionaryCount = noDictionaryCount;
+ parameters.partitionID = partitionID;
+ parameters.segmentId = segmentId;
+ parameters.taskNo = taskNo;
+ parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+ parameters.noDictionarySortColumn = noDictionarySortColumn;
+ parameters.numberOfSortColumns = numberOfSortColumns;
+ parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+ parameters.numberOfCores = numberOfCores;
+ parameters.batchSortSizeinMb = batchSortSizeinMb;
+ return parameters;
+ }
+
+ public String[] getTempFileLocation() {
+ return tempFileLocation;
+ }
+
+ public void setTempFileLocation(String[] tempFileLocation) {
+ this.tempFileLocation = tempFileLocation;
+ }
+
+ public int getSortBufferSize() {
+ return sortBufferSize;
+ }
+
+ public void setSortBufferSize(int sortBufferSize) {
+ this.sortBufferSize = sortBufferSize;
+ }
+
+ public int getMeasureColCount() {
+ return measureColCount;
+ }
+
+ public void setMeasureColCount(int measureColCount) {
+ this.measureColCount = measureColCount;
+ }
+
+ public int getDimColCount() {
+ return dimColCount;
+ }
+
+ public void setDimColCount(int dimColCount) {
+ this.dimColCount = dimColCount;
+ }
+
+ public int getComplexDimColCount() {
+ return complexDimColCount;
+ }
+
+ public void setComplexDimColCount(int complexDimColCount) {
+ this.complexDimColCount = complexDimColCount;
+ }
+
+ public int getFileBufferSize() {
+ return fileBufferSize;
+ }
+
+ public void setFileBufferSize(int fileBufferSize) {
+ this.fileBufferSize = fileBufferSize;
+ }
+
+ public int getNumberOfIntermediateFileToBeMerged() {
+ return numberOfIntermediateFileToBeMerged;
+ }
+
+ public void setNumberOfIntermediateFileToBeMerged(int numberOfIntermediateFileToBeMerged) {
+ this.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
+ }
+
+ public int getFileWriteBufferSize() {
+ return fileWriteBufferSize;
+ }
+
+ public void setFileWriteBufferSize(int fileWriteBufferSize) {
+ this.fileWriteBufferSize = fileWriteBufferSize;
+ }
+
+ public SortObserver getObserver() {
+ return observer;
+ }
+
+ public void setObserver(SortObserver observer) {
+ this.observer = observer;
+ }
+
+ public int getSortTempFileNoOFRecordsInCompression() {
+ return sortTempFileNoOFRecordsInCompression;
+ }
+
+ public void setSortTempFileNoOFRecordsInCompression(int sortTempFileNoOFRecordsInCompression) {
+ this.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
+ }
+
+ public boolean isSortFileCompressionEnabled() {
+ return isSortFileCompressionEnabled;
+ }
+
+ public void setSortFileCompressionEnabled(boolean sortFileCompressionEnabled) {
+ isSortFileCompressionEnabled = sortFileCompressionEnabled;
+ }
+
+ public boolean isPrefetch() {
+ return prefetch;
+ }
+
+ public void setPrefetch(boolean prefetch) {
+ this.prefetch = prefetch;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public void setDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public DataType[] getMeasureDataType() {
+ return measureDataType;
+ }
+
+ public void setMeasureDataType(DataType[] measureDataType) {
+ this.measureDataType = measureDataType;
+ }
+
+ public int getNoDictionaryCount() {
+ return noDictionaryCount;
+ }
+
+ public void setNoDictionaryCount(int noDictionaryCount) {
+ this.noDictionaryCount = noDictionaryCount;
+ }
+
+ public String getPartitionID() {
+ return partitionID;
+ }
+
+ public void setPartitionID(String partitionID) {
+ this.partitionID = partitionID;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ }
+
+ public String getTaskNo() {
+ return taskNo;
+ }
+
+ public void setTaskNo(String taskNo) {
+ this.taskNo = taskNo;
+ }
+
+ public boolean[] getNoDictionaryDimnesionColumn() {
+ return noDictionaryDimnesionColumn;
+ }
+
+ public void setNoDictionaryDimnesionColumn(boolean[] noDictionaryDimnesionColumn) {
+ this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+ }
+
+ public int getNumberOfCores() {
+ return numberOfCores;
+ }
+
+ public void setNumberOfCores(int numberOfCores) {
+ this.numberOfCores = numberOfCores;
+ }
+
+ public int getNumberOfSortColumns() {
+ return numberOfSortColumns;
+ }
+
+ public void setNumberOfSortColumns(int numberOfSortColumns) {
+ this.numberOfSortColumns = Math.min(numberOfSortColumns, this.dimColCount);
+ }
+
+ public boolean[] getNoDictionarySortColumn() {
+ return noDictionarySortColumn;
+ }
+
+ public void setNoDictionarySortColumn(boolean[] noDictionarySortColumn) {
+ this.noDictionarySortColumn = noDictionarySortColumn;
+ }
+
+ public int getNumberOfNoDictSortColumns() {
+ return numberOfNoDictSortColumns;
+ }
+
+ public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
+ this.numberOfNoDictSortColumns = Math.min(numberOfNoDictSortColumns, noDictionaryCount);
+ }
+
+ public int getBatchSortSizeinMb() {
+ return batchSortSizeinMb;
+ }
+
+ public void setBatchSortSizeinMb(int batchSortSizeinMb) {
+ this.batchSortSizeinMb = batchSortSizeinMb;
+ }
+
+ public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
+ SortParameters parameters = new SortParameters();
+ CarbonTableIdentifier tableIdentifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ CarbonProperties carbonProperties = CarbonProperties.getInstance();
+ parameters.setDatabaseName(tableIdentifier.getDatabaseName());
+ parameters.setTableName(tableIdentifier.getTableName());
+ parameters.setPartitionID(configuration.getPartitionId());
+ parameters.setSegmentId(configuration.getSegmentId());
+ parameters.setTaskNo(configuration.getTaskNo());
+ parameters.setMeasureColCount(configuration.getMeasureCount());
+ parameters.setDimColCount(
+ configuration.getDimensionCount() - configuration.getComplexColumnCount());
+ parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
+ parameters.setComplexDimColCount(configuration.getComplexColumnCount());
+ parameters.setNoDictionaryDimnesionColumn(
+ CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+ parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
+
+ parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
+ parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
+ setNoDictionarySortColumnMapping(parameters);
+ parameters.setObserver(new SortObserver());
+ // get sort buffer size
+ parameters.setSortBufferSize(Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.SORT_SIZE,
+ CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
+ LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
+ // set number of intermedaite file to merge
+ parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+ CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
+
+ LOGGER.info("Number of intermediate file to be merged: " + parameters
+ .getNumberOfIntermediateFileToBeMerged());
+
+ // get file buffer size
+ parameters.setFileBufferSize(CarbonDataProcessorUtil
+ .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
+ CarbonCommonConstants.CONSTANT_SIZE_TEN));
+
+ LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
+
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+ tableIdentifier.getTableName(), configuration.getTaskNo(),
+ configuration.getPartitionId(), configuration.getSegmentId(), false, false);
+ String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
+ File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+
+ parameters.setTempFileLocation(sortTempDirs);
+ LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
+
+ int numberOfCores;
+ try {
+ numberOfCores = Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+ CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+ numberOfCores = numberOfCores / 2;
+ } catch (NumberFormatException exc) {
+ numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+ }
+ parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
+
+ parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
+ CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
+
+ parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
+ .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+ CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
+
+ int sortTempFileNoOFRecordsInCompression;
+ try {
+ sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+ CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+ if (sortTempFileNoOFRecordsInCompression < 1) {
+ LOGGER.error("Invalid value for: "
+ + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ + ":Only Positive Integer value(greater than zero) is allowed.Default value will "
+ + "be used");
+
+ sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+ CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.error(
+ "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ + ", only Positive Integer value is allowed. Default value will be used");
+
+ sortTempFileNoOFRecordsInCompression = Integer
+ .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+ }
+ parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
+
+ if (parameters.isSortFileCompressionEnabled()) {
+ LOGGER.info("Compression will be used for writing the sort temp File");
+ }
+
+ parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
+ parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
+
+ DataType[] measureDataType = configuration.getMeasureDataType();
+ parameters.setMeasureDataType(measureDataType);
+ return parameters;
+ }
+
+ /**
+ * this method will set the boolean mapping for no dictionary sort columns
+ *
+ * @param parameters
+ */
+ private static void setNoDictionarySortColumnMapping(SortParameters parameters) {
+ if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length) {
+ parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
+ } else {
+ boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()];
+ System
+ .arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0, noDictionarySortColumnTemp, 0,
+ parameters.getNumberOfSortColumns());
+ parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
+ }
+ }
+
+ public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
+ String tableName, int dimColCount, int complexDimColCount, int measureColCount,
+ int noDictionaryCount, String partitionID, String segmentId, String taskNo,
+ boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
+ SortParameters parameters = new SortParameters();
+ CarbonProperties carbonProperties = CarbonProperties.getInstance();
+ parameters.setDatabaseName(databaseName);
+ parameters.setTableName(tableName);
+ parameters.setPartitionID(partitionID);
+ parameters.setSegmentId(segmentId);
+ parameters.setTaskNo(taskNo);
+ parameters.setMeasureColCount(measureColCount);
+ parameters.setDimColCount(dimColCount - complexDimColCount);
+ parameters.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
+ parameters.setNoDictionaryCount(noDictionaryCount);
+ parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
+ parameters.setComplexDimColCount(complexDimColCount);
+ parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
+ parameters.setObserver(new SortObserver());
+ // get sort buffer size
+ parameters.setSortBufferSize(Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.SORT_SIZE,
+ CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
+ LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
+ // set number of intermedaite file to merge
+ parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+ CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
+
+ LOGGER.info("Number of intermediate file to be merged: " + parameters
+ .getNumberOfIntermediateFileToBeMerged());
+
+ // get file buffer size
+ parameters.setFileBufferSize(CarbonDataProcessorUtil
+ .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
+ CarbonCommonConstants.CONSTANT_SIZE_TEN));
+
+ LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
+
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId,
+ isCompactionFlow, false);
+ String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
+ File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ parameters.setTempFileLocation(sortTempDirs);
+ LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
+
+ int numberOfCores;
+ try {
+ numberOfCores = Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+ CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+ numberOfCores = numberOfCores / 2;
+ } catch (NumberFormatException exc) {
+ numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+ }
+ parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
+
+ parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
+ CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
+
+ parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
+ .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+ CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
+
+ int sortTempFileNoOFRecordsInCompression;
+ try {
+ sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
+ .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+ CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+ if (sortTempFileNoOFRecordsInCompression < 1) {
+ LOGGER.error("Invalid value for: "
+ + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ + ":Only Positive Integer value(greater than zero) is allowed.Default value will "
+ + "be used");
+
+ sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+ CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.error(
+ "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ + ", only Positive Integer value is allowed. Default value will be used");
+
+ sortTempFileNoOFRecordsInCompression = Integer
+ .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+ }
+ parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
+
+ if (parameters.isSortFileCompressionEnabled()) {
+ LOGGER.info("Compression will be used for writing the sort temp File");
+ }
+
+ parameters.setPrefetch(CarbonCommonConstants. CARBON_PREFETCH_IN_MERGE_VALUE);
+ parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
+
+ DataType[] type = CarbonDataProcessorUtil
+ .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
+ parameters.getTableName());
+ parameters.setMeasureDataType(type);
+ setNoDictionarySortColumnMapping(parameters);
+ return parameters;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
new file mode 100644
index 0000000..c4b0b31
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> {
+
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(SortTempFileChunkHolder.class.getName());
+
+ /**
+ * temp file
+ */
+ private File tempFile;
+
+ /**
+ * read stream
+ */
+ private DataInputStream stream;
+
+ /**
+ * entry count
+ */
+ private int entryCount;
+
+ /**
+ * number record read
+ */
+ private int numberOfObjectRead;
+
+ /**
+ * return row
+ */
+ private Object[] returnRow;
+
+ /**
+ * number of measures
+ */
+ private int measureCount;
+
+ /**
+ * number of dimensionCount
+ */
+ private int dimensionCount;
+
+ /**
+ * number of complexDimensionCount
+ */
+ private int complexDimensionCount;
+
+ /**
+ * fileBufferSize for file reader stream size
+ */
+ private int fileBufferSize;
+
+ private Object[][] currentBuffer;
+
+ private Object[][] backupBuffer;
+
+ private boolean isBackupFilled;
+
+ private boolean prefetch;
+
+ private int bufferSize;
+
+ private int bufferRowCounter;
+
+ private ExecutorService executorService;
+
+ private Future<Void> submit;
+
+ private int prefetchRecordsProceesed;
+
+ /**
+ * sortTempFileNoOFRecordsInCompression
+ */
+ private int sortTempFileNoOFRecordsInCompression;
+
+ /**
+ * isSortTempFileCompressionEnabled
+ */
+ private boolean isSortTempFileCompressionEnabled;
+
+ /**
+ * totalRecordFetch
+ */
+ private int totalRecordFetch;
+
+ private int noDictionaryCount;
+
+ private DataType[] aggType;
+
+ /**
+ * to store whether dimension is of dictionary type or not
+ */
+ private boolean[] isNoDictionaryDimensionColumn;
+
+ /**
+ * to store whether sort column is of dictionary type or not
+ */
+ private boolean[] isNoDictionarySortColumn;
+
+ /**
+ * Constructor to initialize
+ *
+ * @param tempFile
+ * @param dimensionCount
+ * @param complexDimensionCount
+ * @param measureCount
+ * @param fileBufferSize
+ * @param noDictionaryCount
+ * @param aggType
+ * @param isNoDictionaryDimensionColumn
+ */
+ public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
+ int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
+ boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
+ // set temp file
+ this.tempFile = tempFile;
+
+ // set measure and dimension count
+ this.measureCount = measureCount;
+ this.dimensionCount = dimensionCount;
+ this.complexDimensionCount = complexDimensionCount;
+
+ this.noDictionaryCount = noDictionaryCount;
+ // set mdkey length
+ this.fileBufferSize = fileBufferSize;
+ this.executorService = Executors.newFixedThreadPool(1);
+ this.aggType = aggType;
+
+ this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
+ this.isNoDictionarySortColumn = isNoDictionarySortColumn;
+ }
+
+ /**
+ * This method will be used to initialize
+ *
+ * @throws CarbonSortKeyAndGroupByException problem while initializing
+ */
+ public void initialize() throws CarbonSortKeyAndGroupByException {
+ prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
+ CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
+ bufferSize = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
+ CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
+ this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+ CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
+ if (this.isSortTempFileCompressionEnabled) {
+ LOGGER.info("Compression was used while writing the sortTempFile");
+ }
+
+ try {
+ this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+ CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+ if (this.sortTempFileNoOFRecordsInCompression < 1) {
+ LOGGER.error("Invalid value for: "
+ + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
+ + " be used");
+
+ this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+ CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.error(
+ "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ + ", only Positive Integer value is allowed.Default value will be used");
+ this.sortTempFileNoOFRecordsInCompression = Integer
+ .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+ }
+
+ initialise();
+ }
+
+ private void initialise() throws CarbonSortKeyAndGroupByException {
+ try {
+ if (isSortTempFileCompressionEnabled) {
+ this.bufferSize = sortTempFileNoOFRecordsInCompression;
+ }
+ stream = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
+ this.entryCount = stream.readInt();
+ if (prefetch) {
+ new DataFetcher(false).call();
+ totalRecordFetch += currentBuffer.length;
+ if (totalRecordFetch < this.entryCount) {
+ submit = executorService.submit(new DataFetcher(true));
+ }
+ } else {
+ if (isSortTempFileCompressionEnabled) {
+ new DataFetcher(false).call();
+ }
+ }
+
+ } catch (FileNotFoundException e) {
+ LOGGER.error(e);
+ throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
+ } catch (IOException e) {
+ LOGGER.error(e);
+ throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
+ }
+ }
+
+ /**
+ * This method will be used to read new row from file
+ *
+ * @throws CarbonSortKeyAndGroupByException problem while reading
+ */
+ public void readRow() throws CarbonSortKeyAndGroupByException {
+ if (prefetch) {
+ fillDataForPrefetch();
+ } else if (isSortTempFileCompressionEnabled) {
+ if (bufferRowCounter >= bufferSize) {
+ try {
+ new DataFetcher(false).call();
+ bufferRowCounter = 0;
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
+ }
+
+ }
+ prefetchRecordsProceesed++;
+ returnRow = currentBuffer[bufferRowCounter++];
+ } else {
+ this.returnRow = getRowFromStream();
+ }
+ }
+
+ private void fillDataForPrefetch() {
+ if (bufferRowCounter >= bufferSize) {
+ if (isBackupFilled) {
+ bufferRowCounter = 0;
+ currentBuffer = backupBuffer;
+ totalRecordFetch += currentBuffer.length;
+ isBackupFilled = false;
+ if (totalRecordFetch < this.entryCount) {
+ submit = executorService.submit(new DataFetcher(true));
+ }
+ } else {
+ try {
+ submit.get();
+ } catch (Exception e) {
+ LOGGER.error(e);
+ }
+ bufferRowCounter = 0;
+ currentBuffer = backupBuffer;
+ isBackupFilled = false;
+ totalRecordFetch += currentBuffer.length;
+ if (totalRecordFetch < this.entryCount) {
+ submit = executorService.submit(new DataFetcher(true));
+ }
+ }
+ }
+ prefetchRecordsProceesed++;
+ returnRow = currentBuffer[bufferRowCounter++];
+ }
+
+ /**
+ * Reads row from file
+ * @return Object[]
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
+ // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+
+ Object[] holder = new Object[3];
+ int index = 0;
+ int nonDicIndex = 0;
+ int[] dim = new int[this.dimensionCount - this.noDictionaryCount];
+ byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][];
+ Object[] measures = new Object[this.measureCount];
+ try {
+ // read dimension values
+ for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+ if (isNoDictionaryDimensionColumn[i]) {
+ short len = stream.readShort();
+ byte[] array = new byte[len];
+ stream.readFully(array);
+ nonDicArray[nonDicIndex++] = array;
+ } else {
+ dim[index++] = stream.readInt();
+ }
+ }
+
+ for (int i = 0; i < complexDimensionCount; i++) {
+ short len = stream.readShort();
+ byte[] array = new byte[len];
+ stream.readFully(array);
+ nonDicArray[nonDicIndex++] = array;
+ }
+
+ index = 0;
+ // read measure values
+ for (int i = 0; i < this.measureCount; i++) {
+ if (stream.readByte() == 1) {
+ switch (aggType[i]) {
+ case SHORT:
+ measures[index++] = stream.readShort();
+ break;
+ case INT:
+ measures[index++] = stream.readInt();
+ break;
+ case LONG:
+ measures[index++] = stream.readLong();
+ break;
+ case DOUBLE:
+ measures[index++] = stream.readDouble();
+ break;
+ case DECIMAL:
+ int len = stream.readInt();
+ byte[] buff = new byte[len];
+ stream.readFully(buff);
+ measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported data type:" + aggType[i]);
+ }
+ } else {
+ measures[index++] = null;
+ }
+ }
+
+ NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+
+ // increment number if record read
+ this.numberOfObjectRead++;
+ } catch (IOException e) {
+ LOGGER.error("Problme while reading the madkey fom sort temp file");
+ throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e);
+ }
+
+ //return out row
+ return holder;
+ }
+
+ /**
+ * below method will be used to get the row
+ *
+ * @return row
+ */
+ public Object[] getRow() {
+ return this.returnRow;
+ }
+
+ /**
+ * below method will be used to check whether any more records are present
+ * in file or not
+ *
+ * @return more row present in file
+ */
+ public boolean hasNext() {
+ if (prefetch || isSortTempFileCompressionEnabled) {
+ return this.prefetchRecordsProceesed < this.entryCount;
+ }
+ return this.numberOfObjectRead < this.entryCount;
+ }
+
+ /**
+ * Below method will be used to close streams
+ */
+ public void closeStream() {
+ CarbonUtil.closeStreams(stream);
+ executorService.shutdown();
+ this.backupBuffer = null;
+ this.currentBuffer = null;
+ }
+
+ /**
+ * This method will number of entries
+ *
+ * @return entryCount
+ */
+ public int getEntryCount() {
+ return entryCount;
+ }
+
+ @Override public int compareTo(SortTempFileChunkHolder other) {
+ int diff = 0;
+ int index = 0;
+ int noDictionaryIndex = 0;
+ int[] leftMdkArray = (int[]) returnRow[0];
+ int[] rightMdkArray = (int[]) other.returnRow[0];
+ byte[][] leftNonDictArray = (byte[][]) returnRow[1];
+ byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
+ for (boolean isNoDictionary : isNoDictionarySortColumn) {
+ if (isNoDictionary) {
+ diff = UnsafeComparer.INSTANCE
+ .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);
+ if (diff != 0) {
+ return diff;
+ }
+ noDictionaryIndex++;
+ } else {
+ diff = leftMdkArray[index] - rightMdkArray[index];
+ if (diff != 0) {
+ return diff;
+ }
+ index++;
+ }
+
+ }
+ return diff;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!(obj instanceof SortTempFileChunkHolder)) {
+ return false;
+ }
+ SortTempFileChunkHolder o = (SortTempFileChunkHolder) obj;
+
+ return this == o;
+ }
+
+ @Override public int hashCode() {
+ int hash = 0;
+ hash += 31 * measureCount;
+ hash += 31 * dimensionCount;
+ hash += 31 * complexDimensionCount;
+ hash += 31 * noDictionaryCount;
+ hash += tempFile.hashCode();
+ return hash;
+ }
+
+ private final class DataFetcher implements Callable<Void> {
+ private boolean isBackUpFilling;
+
+ private int numberOfRecords;
+
+ private DataFetcher(boolean backUp) {
+ isBackUpFilling = backUp;
+ calculateNumberOfRecordsToBeFetched();
+ }
+
+ private void calculateNumberOfRecordsToBeFetched() {
+ int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
+ numberOfRecords =
+ bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
+ }
+
+ @Override public Void call() throws Exception {
+ try {
+ if (isBackUpFilling) {
+ backupBuffer = prefetchRecordsFromFile(numberOfRecords);
+ isBackupFilled = true;
+ } else {
+ currentBuffer = prefetchRecordsFromFile(numberOfRecords);
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ }
+ return null;
+ }
+
+ }
+
+ /**
+ * This method will read the records from sort temp file and keep it in a buffer
+ *
+ * @param numberOfRecords
+ * @return
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private Object[][] prefetchRecordsFromFile(int numberOfRecords)
+ throws CarbonSortKeyAndGroupByException {
+ Object[][] records = new Object[numberOfRecords][];
+ for (int i = 0; i < numberOfRecords; i++) {
+ records[i] = getRowFromStream();
+ }
+ return records;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
new file mode 100644
index 0000000..025aef8
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class SortTempFileChunkWriter implements TempSortFileWriter {
+ /**
+ * writer
+ */
+ private TempSortFileWriter writer;
+
+ /**
+ * recordPerLeaf
+ */
+ private int recordPerLeaf;
+
+ /**
+ * CarbonCompressedSortTempFileChunkWriter
+ *
+ * @param writer
+ */
+ public SortTempFileChunkWriter(TempSortFileWriter writer, int recordPerLeaf) {
+ this.writer = writer;
+ this.recordPerLeaf = recordPerLeaf;
+ }
+
+ /**
+ * initialize
+ */
+ public void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException {
+ this.writer.initiaize(file, entryCount);
+ }
+
+ /**
+ * finish
+ */
+ public void finish() {
+ this.writer.finish();
+ }
+
+ /**
+ * Below method will be used to write the sort temp file chunk by chunk
+ */
+ public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
+ int recordCount = 0;
+ Object[][] tempRecords;
+ while (recordCount < records.length) {
+ if (records.length - recordCount < recordPerLeaf) {
+ recordPerLeaf = records.length - recordCount;
+ }
+ tempRecords = new Object[recordPerLeaf][];
+ System.arraycopy(records, recordCount, tempRecords, 0, recordPerLeaf);
+ recordCount += recordPerLeaf;
+ this.writer.writeSortTempFile(tempRecords);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
new file mode 100644
index 0000000..0de9af7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+public interface TempSortFileReader {
+ /**
+ * below method will be used to close the file holder
+ */
+ void finish();
+
+ /**
+ * Below method will be used to get the row
+ */
+ Object[][] getRow();
+
+ /**
+ * Below method will be used to get the total row count in temp file
+ *
+ * @return
+ */
+ int getEntryCount();
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
new file mode 100644
index 0000000..4e4a8e7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.File;
+
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public interface TempSortFileWriter {
+ /**
+ * Method will be used to initialize
+ *
+ * @param file
+ * @param entryCount
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException;
+
+ /**
+ * Method will be used to finish
+ */
+ void finish();
+
+ /**
+ * Below method will be used to write the sort temp file
+ *
+ * @param records
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException;
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
new file mode 100644
index 0000000..259ab9f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+public final class TempSortFileWriterFactory {
+ private static final TempSortFileWriterFactory WRITERFACTORY = new TempSortFileWriterFactory();
+
+ private TempSortFileWriterFactory() {
+
+ }
+
+ public static TempSortFileWriterFactory getInstance() {
+ return WRITERFACTORY;
+ }
+
+ public TempSortFileWriter getTempSortFileWriter(boolean isCompressionEnabled, int dimensionCount,
+ int complexDimensionCount, int measureCount, int noDictionaryCount, int writeBufferSize) {
+ if (isCompressionEnabled) {
+ return new CompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
+ noDictionaryCount, writeBufferSize);
+ } else {
+ return new UnCompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
+ noDictionaryCount, writeBufferSize);
+ }
+ }
+}