You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:12 UTC
[25/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
new file mode 100644
index 0000000..40fe8d5
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class UnCompressedTempSortFileWriter extends AbstractTempSortFileWriter {
+
+ /**
+ * UnCompressedTempSortFileWriter
+ *
+ * @param writeBufferSize
+ * @param dimensionCount
+ * @param measureCount
+ */
+ public UnCompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
+ int measureCount, int noDictionaryCount, int writeBufferSize) {
+ super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
+ }
+
+ public static void writeDataOutputStream(Object[][] records, DataOutputStream dataOutputStream,
+ int measureCount, int dimensionCount, int noDictionaryCount, int complexDimensionCount)
+ throws IOException {
+ Object[] row;
+ for (int recordIndex = 0; recordIndex < records.length; recordIndex++) {
+ row = records[recordIndex];
+ int fieldIndex = 0;
+
+ for (int counter = 0; counter < dimensionCount; counter++) {
+ dataOutputStream.writeInt((Integer) NonDictionaryUtil.getDimension(fieldIndex++, row));
+ }
+
+ //write byte[] of high card dims
+ if (noDictionaryCount > 0) {
+ dataOutputStream.write(NonDictionaryUtil.getByteArrayForNoDictionaryCols(row));
+ }
+ fieldIndex = 0;
+ for (int counter = 0; counter < complexDimensionCount; counter++) {
+ int complexByteArrayLength = ((byte[]) row[fieldIndex]).length;
+ dataOutputStream.writeInt(complexByteArrayLength);
+ dataOutputStream.write(((byte[]) row[fieldIndex++]));
+ }
+
+ for (int counter = 0; counter < measureCount; counter++) {
+ if (null != row[fieldIndex]) {
+ dataOutputStream.write((byte) 1);
+ dataOutputStream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+ } else {
+ dataOutputStream.write((byte) 0);
+ }
+
+ fieldIndex++;
+ }
+
+ }
+ }
+
+ /**
+ * Below method will be used to write the sort temp file
+ *
+ * @param records
+ */
+ public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
+ ByteArrayOutputStream blockDataArray = null;
+ DataOutputStream dataOutputStream = null;
+ int totalSize = 0;
+ int recordSize = 0;
+ try {
+ recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
+ * CarbonCommonConstants.INT_SIZE_IN_BYTE);
+ totalSize = records.length * recordSize;
+
+ blockDataArray = new ByteArrayOutputStream(totalSize);
+ dataOutputStream = new DataOutputStream(blockDataArray);
+
+ writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
+ noDictionaryCount, complexDimensionCount);
+ stream.writeInt(records.length);
+ byte[] byteArray = blockDataArray.toByteArray();
+ stream.writeInt(byteArray.length);
+ stream.write(byteArray);
+
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException(e);
+ } finally {
+ CarbonUtil.closeStreams(blockDataArray);
+ CarbonUtil.closeStreams(dataOutputStream);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/exception/CarbonSortKeyAndGroupByException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/exception/CarbonSortKeyAndGroupByException.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/exception/CarbonSortKeyAndGroupByException.java
deleted file mode 100644
index d4e4c35..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/exception/CarbonSortKeyAndGroupByException.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.exception;
-
-import java.util.Locale;
-
-public class CarbonSortKeyAndGroupByException extends Exception {
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public CarbonSortKeyAndGroupByException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public CarbonSortKeyAndGroupByException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param t
- */
- public CarbonSortKeyAndGroupByException(Throwable t) {
- super(t);
- }
-
- /**
- * This method is used to get the localized message.
- *
- * @param locale - A Locale object represents a specific geographical,
- * political, or cultural region.
- * @return - Localized error message.
- */
- public String getLocalizedMessage(Locale locale) {
- return "";
- }
-
- /**
- * getLocalizedMessage
- */
- @Override public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/AbstractTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/AbstractTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/AbstractTempSortFileWriter.java
deleted file mode 100644
index bd2ccec..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/AbstractTempSortFileWriter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public abstract class AbstractTempSortFileWriter implements TempSortFileWriter {
-
- /**
- * writeFileBufferSize
- */
- protected int writeBufferSize;
-
- /**
- * Measure count
- */
- protected int measureCount;
-
- /**
- * Measure count
- */
- protected int dimensionCount;
-
- /**
- * complexDimension count
- */
- protected int complexDimensionCount;
-
- /**
- * stream
- */
- protected DataOutputStream stream;
-
- /**
- * noDictionaryCount
- */
- protected int noDictionaryCount;
-
- /**
- * AbstractTempSortFileWriter
- *
- * @param writeBufferSize
- * @param dimensionCount
- * @param measureCount
- */
- public AbstractTempSortFileWriter(int dimensionCount, int complexDimensionCount, int measureCount,
- int noDictionaryCount, int writeBufferSize) {
- this.writeBufferSize = writeBufferSize;
- this.dimensionCount = dimensionCount;
- this.complexDimensionCount = complexDimensionCount;
- this.measureCount = measureCount;
- this.noDictionaryCount = noDictionaryCount;
- }
-
- /**
- * Below method will be used to initialize the stream and write the entry count
- */
- @Override public void initiaize(File file, int entryCount)
- throws CarbonSortKeyAndGroupByException {
- try {
- stream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file), writeBufferSize));
- stream.writeInt(entryCount);
- } catch (FileNotFoundException e1) {
- throw new CarbonSortKeyAndGroupByException(e1);
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException(e);
- }
- }
-
- /**
- * Below method will be used to close the stream
- */
- @Override public void finish() {
- CarbonUtil.closeStreams(stream);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/CompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/CompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/CompressedTempSortFileWriter.java
deleted file mode 100644
index e4c851a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/CompressedTempSortFileWriter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class CompressedTempSortFileWriter extends AbstractTempSortFileWriter {
-
- /**
- * CompressedTempSortFileWriter
- *
- * @param writeBufferSize
- * @param dimensionCount
- * @param measureCount
- */
- public CompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
- int measureCount, int noDictionaryCount, int writeBufferSize) {
- super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
- }
-
- /**
- * Below method will be used to write the sort temp file
- *
- * @param records
- */
- public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
- DataOutputStream dataOutputStream = null;
- ByteArrayOutputStream blockDataArray = null;
- int totalSize = 0;
- int recordSize = 0;
- try {
- recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
- * CarbonCommonConstants.INT_SIZE_IN_BYTE);
- totalSize = records.length * recordSize;
-
- blockDataArray = new ByteArrayOutputStream(totalSize);
- dataOutputStream = new DataOutputStream(blockDataArray);
-
- UnCompressedTempSortFileWriter
- .writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
- noDictionaryCount, complexDimensionCount);
-
- stream.writeInt(records.length);
- byte[] byteArray = CompressorFactory.getInstance().getCompressor()
- .compressByte(blockDataArray.toByteArray());
- stream.writeInt(byteArray.length);
- stream.write(byteArray);
-
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException(e);
- } finally {
- CarbonUtil.closeStreams(blockDataArray);
- CarbonUtil.closeStreams(dataOutputStream);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
deleted file mode 100644
index 7c6a889..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.AbstractQueue;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class IntermediateFileMerger implements Runnable {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(IntermediateFileMerger.class.getName());
-
- /**
- * recordHolderHeap
- */
- private AbstractQueue<SortTempFileChunkHolder> recordHolderHeap;
-
- /**
- * fileCounter
- */
- private int fileCounter;
-
- /**
- * stream
- */
- private DataOutputStream stream;
-
- /**
- * totalNumberOfRecords
- */
- private int totalNumberOfRecords;
-
- /**
- * records
- */
- private Object[][] records;
-
- /**
- * entryCount
- */
- private int entryCount;
-
- /**
- * writer
- */
- private TempSortFileWriter writer;
-
- /**
- * totalSize
- */
- private int totalSize;
-
- private SortParameters mergerParameters;
-
- private File[] intermediateFiles;
-
- private File outPutFile;
-
- private boolean[] noDictionarycolumnMapping;
-
- /**
- * IntermediateFileMerger Constructor
- */
- public IntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles,
- File outPutFile) {
- this.mergerParameters = mergerParameters;
- this.fileCounter = intermediateFiles.length;
- this.intermediateFiles = intermediateFiles;
- this.outPutFile = outPutFile;
- noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
- }
-
- @Override
- public void run() {
- long intermediateMergeStartTime = System.currentTimeMillis();
- int fileConterConst = fileCounter;
- boolean isFailed = false;
- try {
- startSorting();
- initialize();
- while (hasNext()) {
- writeDataTofile(next());
- }
- if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
- if (entryCount > 0) {
- if (entryCount < totalSize) {
- Object[][] temp = new Object[entryCount][];
- System.arraycopy(records, 0, temp, 0, entryCount);
- records = temp;
- this.writer.writeSortTempFile(temp);
- } else {
- this.writer.writeSortTempFile(records);
- }
- }
- }
- double intermediateMergeCostTime =
- (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
- LOGGER.info("============================== Intermediate Merge of " + fileConterConst +
- " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
- } catch (Exception e) {
- LOGGER.error(e, "Problem while intermediate merging");
- isFailed = true;
- } finally {
- records = null;
- CarbonUtil.closeStreams(this.stream);
- if (null != writer) {
- writer.finish();
- }
- if (!isFailed) {
- try {
- finish();
- } catch (CarbonSortKeyAndGroupByException e) {
- LOGGER.error(e, "Problem while deleting the merge file");
- }
- } else {
- if (outPutFile.delete()) {
- LOGGER.error("Problem while deleting the merge file");
- }
- }
- }
- }
-
- /**
- * This method is responsible for initializing the out stream
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- private void initialize() throws CarbonSortKeyAndGroupByException {
- if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
- try {
- this.stream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(outPutFile),
- mergerParameters.getFileWriteBufferSize()));
- this.stream.writeInt(this.totalNumberOfRecords);
- } catch (FileNotFoundException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
- }
- } else {
- writer = TempSortFileWriterFactory.getInstance()
- .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
- mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
- mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
- mergerParameters.getFileWriteBufferSize());
- writer.initiaize(outPutFile, totalNumberOfRecords);
-
- if (mergerParameters.isPrefetch()) {
- totalSize = mergerParameters.getBufferSize();
- } else {
- totalSize = mergerParameters.getSortTempFileNoOFRecordsInCompression();
- }
- }
- }
-
- /**
- * This method will be used to get the sorted record from file
- *
- * @return sorted record sorted record
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
- Object[] row = null;
-
- // poll the top object from heap
- // heap maintains binary tree which is based on heap condition that will
- // be based on comparator we are passing the heap
- // when will call poll it will always delete root of the tree and then
- // it does trickel down operation complexity is log(n)
- SortTempFileChunkHolder poll = this.recordHolderHeap.poll();
-
- // get the row from chunk
- row = poll.getRow();
-
- // check if there no entry present
- if (!poll.hasNext()) {
- // if chunk is empty then close the stream
- poll.closeStream();
-
- // change the file counter
- --this.fileCounter;
-
- // reaturn row
- return row;
- }
-
- // read new row
- poll.readRow();
-
- // add to heap
- this.recordHolderHeap.add(poll);
-
- // return row
- return row;
- }
-
- /**
- * Below method will be used to start storing process This method will get
- * all the temp files present in sort temp folder then it will create the
- * record holder heap and then it will read first record from each file and
- * initialize the heap
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- private void startSorting() throws CarbonSortKeyAndGroupByException {
- LOGGER.info("Number of temp file: " + this.fileCounter);
-
- // create record holder heap
- createRecordHolderQueue(intermediateFiles);
-
- // iterate over file list and create chunk holder and add to heap
- LOGGER.info("Started adding first record from each file");
-
- SortTempFileChunkHolder sortTempFileChunkHolder = null;
-
- for (File tempFile : intermediateFiles) {
- // create chunk holder
- sortTempFileChunkHolder =
- new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
- mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
- mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
- mergerParameters.getMeasureDataType(),
- mergerParameters.getNoDictionaryDimnesionColumn(),
- mergerParameters.getNoDictionarySortColumn());
-
- // initialize
- sortTempFileChunkHolder.initialize();
- sortTempFileChunkHolder.readRow();
- this.totalNumberOfRecords += sortTempFileChunkHolder.getEntryCount();
-
- // add to heap
- this.recordHolderHeap.add(sortTempFileChunkHolder);
- }
-
- LOGGER.info("Heap Size" + this.recordHolderHeap.size());
- }
-
- /**
- * This method will be used to create the heap which will be used to hold
- * the chunk of data
- *
- * @param listFiles list of temp files
- */
- private void createRecordHolderQueue(File[] listFiles) {
- // creating record holder heap
- this.recordHolderHeap = new PriorityQueue<>(listFiles.length);
- }
-
- /**
- * This method will be used to get the sorted row
- *
- * @return sorted row
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[] next() throws CarbonSortKeyAndGroupByException {
- return getSortedRecordFromFile();
- }
-
- /**
- * This method will be used to check whether any more element is present or
- * not
- *
- * @return more element is present
- */
- private boolean hasNext() {
- return this.fileCounter > 0;
- }
-
- /**
- * Below method will be used to write data to file
- *
- * @throws CarbonSortKeyAndGroupByException problem while writing
- */
- private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException {
- if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
- if (entryCount == 0) {
- records = new Object[totalSize][];
- records[entryCount++] = row;
- return;
- }
-
- records[entryCount++] = row;
- if (entryCount == totalSize) {
- this.writer.writeSortTempFile(records);
- entryCount = 0;
- records = new Object[totalSize][];
- }
- return;
- }
- try {
- DataType[] aggType = mergerParameters.getMeasureDataType();
- int[] mdkArray = (int[]) row[0];
- byte[][] nonDictArray = (byte[][]) row[1];
- int mdkIndex = 0;
- int nonDictKeyIndex = 0;
- // write dictionary and non dictionary dimensions here.
- for (boolean nodictinary : noDictionarycolumnMapping) {
- if (nodictinary) {
- byte[] col = nonDictArray[nonDictKeyIndex++];
- stream.writeShort(col.length);
- stream.write(col);
- } else {
- stream.writeInt(mdkArray[mdkIndex++]);
- }
- }
-
- int fieldIndex = 0;
- for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
- if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
- stream.write((byte) 1);
- switch (aggType[counter]) {
- case SHORT:
- stream.writeShort((short)NonDictionaryUtil.getMeasure(fieldIndex, row));
- break;
- case INT:
- stream.writeInt((int)NonDictionaryUtil.getMeasure(fieldIndex, row));
- break;
- case LONG:
- stream.writeLong((long)NonDictionaryUtil.getMeasure(fieldIndex, row));
- break;
- case DOUBLE:
- stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
- break;
- case DECIMAL:
- byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeInt(bigDecimalInBytes.length);
- stream.write(bigDecimalInBytes);
- break;
- default:
- throw new IllegalArgumentException("unsupported data type:" + aggType[counter]);
- }
- } else {
- stream.write((byte) 0);
- }
- fieldIndex++;
- }
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
- }
- }
-
- private void finish() throws CarbonSortKeyAndGroupByException {
- if (recordHolderHeap != null) {
- int size = recordHolderHeap.size();
- for (int i = 0; i < size; i++) {
- recordHolderHeap.poll().closeStream();
- }
- }
- try {
- CarbonUtil.deleteFiles(intermediateFiles);
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
deleted file mode 100644
index 247251e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-
-public class NewRowComparator implements Comparator<Object[]> {
-
- /**
- * mapping of dictionary dimensions and no dictionary of sort_column.
- */
- private boolean[] noDictionarySortColumnMaping;
-
- /**
- * @param noDictionarySortColumnMaping
- */
- public NewRowComparator(boolean[] noDictionarySortColumnMaping) {
- this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
- }
-
- /**
- * Below method will be used to compare two mdkey
- */
- public int compare(Object[] rowA, Object[] rowB) {
- int diff = 0;
-
- int index = 0;
-
- for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-
- if (isNoDictionary) {
- byte[] byteArr1 = (byte[]) rowA[index];
-
- byte[] byteArr2 = (byte[]) rowB[index];
-
- int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
- if (difference != 0) {
- return difference;
- }
- } else {
- int dimFieldA = (int) rowA[index];
- int dimFieldB = (int) rowB[index];
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
-
- index++;
- }
-
- return diff;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
deleted file mode 100644
index 241882e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.util.Comparator;
-
-/**
- * This class is used as comparator for comparing dims which are non high cardinality dims.
- * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
- */
-public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
- /**
- * dimension count
- */
- private int numberOfSortColumns;
-
- /**
- * RowComparatorForNormalDims Constructor
- *
- * @param numberOfSortColumns
- */
- public NewRowComparatorForNormalDims(int numberOfSortColumns) {
- this.numberOfSortColumns = numberOfSortColumns;
- }
-
- /**
- * Below method will be used to compare two surrogate keys
- *
- * @see Comparator#compare(Object, Object)
- */
- public int compare(Object[] rowA, Object[] rowB) {
- int diff = 0;
-
- for (int i = 0; i < numberOfSortColumns; i++) {
-
- int dimFieldA = (int)rowA[i];
- int dimFieldB = (int)rowB[i];
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
- return diff;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
deleted file mode 100644
index 11c42a9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-
-public class RowComparator implements Comparator<Object[]> {
- /**
- * noDictionaryCount represent number of no dictionary cols
- */
- private int noDictionaryCount;
-
- /**
- * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
- */
- private boolean[] noDictionarySortColumnMaping;
-
- /**
- * @param noDictionarySortColumnMaping
- * @param noDictionaryCount
- */
- public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) {
- this.noDictionaryCount = noDictionaryCount;
- this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
- }
-
- /**
- * Below method will be used to compare two mdkey
- */
- public int compare(Object[] rowA, Object[] rowB) {
- int diff = 0;
-
- int normalIndex = 0;
- int noDictionaryindex = 0;
-
- for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-
- if (isNoDictionary) {
- byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-
- ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
-
- // extract a high card dims from complete byte[].
- NonDictionaryUtil
- .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1);
-
- byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-
- ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);
-
- // extract a high card dims from complete byte[].
- NonDictionaryUtil
- .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2);
-
- int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2);
- if (difference != 0) {
- return difference;
- }
- noDictionaryindex++;
- } else {
- int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA);
- int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB);
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- normalIndex++;
- }
-
- }
-
- return diff;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
deleted file mode 100644
index be29bf8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-
-/**
- * This class is used as comparator for comparing dims which are non high cardinality dims.
- * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
- */
-public class RowComparatorForNormalDims implements Comparator<Object[]> {
- /**
- * dimension count
- */
- private int numberOfSortColumns;
-
- /**
- * RowComparatorForNormalDims Constructor
- *
- * @param numberOfSortColumns
- */
- public RowComparatorForNormalDims(int numberOfSortColumns) {
- this.numberOfSortColumns = numberOfSortColumns;
- }
-
- /**
- * Below method will be used to compare two surrogate keys
- *
- * @see Comparator#compare(Object, Object)
- */
- public int compare(Object[] rowA, Object[] rowB) {
- int diff = 0;
-
- for (int i = 0; i < numberOfSortColumns; i++) {
-
- int dimFieldA = NonDictionaryUtil.getDimension(i, rowA);
- int dimFieldB = NonDictionaryUtil.getDimension(i, rowB);
-
- diff = dimFieldA - dimFieldB;
- if (diff != 0) {
- return diff;
- }
- }
- return diff;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
deleted file mode 100644
index 71fc564..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class SortDataRows {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(SortDataRows.class.getName());
- /**
- * entryCount
- */
- private int entryCount;
- /**
- * record holder array
- */
- private Object[][] recordHolderList;
- /**
- * threadStatusObserver
- */
- private ThreadStatusObserver threadStatusObserver;
- /**
- * executor service for data sort holder
- */
- private ExecutorService dataSorterAndWriterExecutorService;
- /**
- * semaphore which will used for managing sorted data object arrays
- */
- private Semaphore semaphore;
-
- private SortParameters parameters;
-
- private int sortBufferSize;
-
- private SortIntermediateFileMerger intermediateFileMerger;
-
- private final Object addRowsLock = new Object();
-
- public SortDataRows(SortParameters parameters,
- SortIntermediateFileMerger intermediateFileMerger) {
- this.parameters = parameters;
-
- this.intermediateFileMerger = intermediateFileMerger;
-
- int batchSize = CarbonProperties.getInstance().getBatchSize();
-
- this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize);
- // observer of writing file in thread
- this.threadStatusObserver = new ThreadStatusObserver();
- }
-
- /**
- * This method will be used to initialize
- */
- public void initialize() throws CarbonSortKeyAndGroupByException {
-
- // create holder list which will hold incoming rows
- // size of list will be sort buffer size + 1 to avoid creation of new
- // array in list array
- this.recordHolderList = new Object[sortBufferSize][];
- // Delete if any older file exists in sort temp folder
- deleteSortLocationIfExists();
-
- // create new sort temp directory
- CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
- this.dataSorterAndWriterExecutorService =
- Executors.newFixedThreadPool(parameters.getNumberOfCores());
- semaphore = new Semaphore(parameters.getNumberOfCores());
- }
-
- /**
- * This method will be used to add new row
- *
- * @param row new row
- * @throws CarbonSortKeyAndGroupByException problem while writing
- */
- public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
- // if record holder list size is equal to sort buffer size then it will
- // sort the list and then write current list data to file
- int currentSize = entryCount;
-
- if (sortBufferSize == currentSize) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("************ Writing to temp file ********** ");
- }
- intermediateFileMerger.startMergingIfPossible();
- Object[][] recordHolderListLocal = recordHolderList;
- try {
- semaphore.acquire();
- dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
- } catch (InterruptedException e) {
- LOGGER.error(
- "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
- throw new CarbonSortKeyAndGroupByException(e.getMessage());
- }
- // create the new holder Array
- this.recordHolderList = new Object[this.sortBufferSize][];
- this.entryCount = 0;
- }
- recordHolderList[entryCount++] = row;
- }
-
- /**
- * This method will be used to add new row
- *
- * @param rowBatch new rowBatch
- * @throws CarbonSortKeyAndGroupByException problem while writing
- */
- public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
- // if record holder list size is equal to sort buffer size then it will
- // sort the list and then write current list data to file
- synchronized (addRowsLock) {
- int sizeLeft = 0;
- if (entryCount + size >= sortBufferSize) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("************ Writing to temp file ********** ");
- }
- intermediateFileMerger.startMergingIfPossible();
- Object[][] recordHolderListLocal = recordHolderList;
- sizeLeft = sortBufferSize - entryCount ;
- if (sizeLeft > 0) {
- System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
- }
- try {
- semaphore.acquire();
- dataSorterAndWriterExecutorService
- .execute(new DataSorterAndWriter(recordHolderListLocal));
- } catch (Exception e) {
- LOGGER.error(
- "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
- throw new CarbonSortKeyAndGroupByException(e);
- }
- // create the new holder Array
- this.recordHolderList = new Object[this.sortBufferSize][];
- this.entryCount = 0;
- size = size - sizeLeft;
- if (size == 0) {
- return;
- }
- }
- System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
- entryCount += size;
- }
- }
-
- /**
- * Below method will be used to start storing process This method will get
- * all the temp files present in sort temp folder then it will create the
- * record holder heap and then it will read first record from each file and
- * initialize the heap
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- public void startSorting() throws CarbonSortKeyAndGroupByException {
- LOGGER.info("File based sorting will be used");
- if (this.entryCount > 0) {
- Object[][] toSort;
- toSort = new Object[entryCount][];
- System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
- if (parameters.getNumberOfNoDictSortColumns() > 0) {
- Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn()));
- } else {
- Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
- }
- recordHolderList = toSort;
-
- // create new file and choose folder randomly
- String[] tmpLocation = parameters.getTempFileLocation();
- String locationChosen = tmpLocation[new Random().nextInt(tmpLocation.length)];
- File file = new File(
- locationChosen + File.separator + parameters.getTableName() +
- System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
- writeDataTofile(recordHolderList, this.entryCount, file);
-
- }
-
- startFileBasedMerge();
- this.recordHolderList = null;
- }
-
- /**
- * Below method will be used to write data to file
- *
- * @throws CarbonSortKeyAndGroupByException problem while writing
- */
- private void writeDataTofile(Object[][] recordHolderList, int entryCountLocal, File file)
- throws CarbonSortKeyAndGroupByException {
- // stream
- if (parameters.isSortFileCompressionEnabled() || parameters.isPrefetch()) {
- writeSortTempFile(recordHolderList, entryCountLocal, file);
- return;
- }
- writeData(recordHolderList, entryCountLocal, file);
- }
-
- private void writeSortTempFile(Object[][] recordHolderList, int entryCountLocal, File file)
- throws CarbonSortKeyAndGroupByException {
- TempSortFileWriter writer = null;
-
- try {
- writer = getWriter();
- writer.initiaize(file, entryCountLocal);
- writer.writeSortTempFile(recordHolderList);
- } catch (CarbonSortKeyAndGroupByException e) {
- LOGGER.error(e, "Problem while writing the sort temp file");
- throw e;
- } finally {
- if (writer != null) {
- writer.finish();
- }
- }
- }
-
- private void writeData(Object[][] recordHolderList, int entryCountLocal, File file)
- throws CarbonSortKeyAndGroupByException {
- DataOutputStream stream = null;
- try {
- // open stream
- stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
- parameters.getFileWriteBufferSize()));
-
- // write number of entries to the file
- stream.writeInt(entryCountLocal);
- int complexDimColCount = parameters.getComplexDimColCount();
- int dimColCount = parameters.getDimColCount() + complexDimColCount;
- DataType[] type = parameters.getMeasureDataType();
- boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
- Object[] row = null;
- for (int i = 0; i < entryCountLocal; i++) {
- // get row from record holder list
- row = recordHolderList[i];
- int dimCount = 0;
- // write dictionary and non dictionary dimensions here.
- for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) {
- if (noDictionaryDimnesionMapping[dimCount]) {
- byte[] col = (byte[]) row[dimCount];
- stream.writeShort(col.length);
- stream.write(col);
- } else {
- stream.writeInt((int)row[dimCount]);
- }
- }
- // write complex dimensions here.
- for (; dimCount < dimColCount; dimCount++) {
- byte[] value = (byte[])row[dimCount];
- stream.writeShort(value.length);
- stream.write(value);
- }
- // as measures are stored in separate array.
- for (int mesCount = 0;
- mesCount < parameters.getMeasureColCount(); mesCount++) {
- Object value = row[mesCount + dimColCount];
- if (null != value) {
- stream.write((byte) 1);
- switch (type[mesCount]) {
- case SHORT:
- stream.writeShort((Short) value);
- break;
- case INT:
- stream.writeInt((Integer) value);
- break;
- case LONG:
- stream.writeLong((Long) value);
- break;
- case DOUBLE:
- stream.writeDouble((Double) value);
- break;
- case DECIMAL:
- BigDecimal val = (BigDecimal) value;
- byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
- stream.writeInt(bigDecimalInBytes.length);
- stream.write(bigDecimalInBytes);
- break;
- default:
- throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
- }
- } else {
- stream.write((byte) 0);
- }
- }
- }
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
- } finally {
- // close streams
- CarbonUtil.closeStreams(stream);
- }
- }
-
- private TempSortFileWriter getWriter() {
- TempSortFileWriter chunkWriter = null;
- TempSortFileWriter writer = TempSortFileWriterFactory.getInstance()
- .getTempSortFileWriter(parameters.isSortFileCompressionEnabled(),
- parameters.getDimColCount(), parameters.getComplexDimColCount(),
- parameters.getMeasureColCount(), parameters.getNoDictionaryCount(),
- parameters.getFileWriteBufferSize());
-
- if (parameters.isPrefetch() && !parameters.isSortFileCompressionEnabled()) {
- chunkWriter = new SortTempFileChunkWriter(writer, parameters.getBufferSize());
- } else {
- chunkWriter =
- new SortTempFileChunkWriter(writer, parameters.getSortTempFileNoOFRecordsInCompression());
- }
-
- return chunkWriter;
- }
-
- /**
- * This method will be used to delete sort temp location is it is exites
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
- CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
- }
-
- /**
- * Below method will be used to start file based merge
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- private void startFileBasedMerge() throws CarbonSortKeyAndGroupByException {
- try {
- dataSorterAndWriterExecutorService.shutdown();
- dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
- }
- }
-
- /**
- * Observer class for thread execution
- * In case of any failure we need stop all the running thread
- */
- private class ThreadStatusObserver {
- /**
- * Below method will be called if any thread fails during execution
- *
- * @param exception
- * @throws CarbonSortKeyAndGroupByException
- */
- public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
- dataSorterAndWriterExecutorService.shutdownNow();
- intermediateFileMerger.close();
- parameters.getObserver().setFailed(true);
- LOGGER.error(exception);
- throw new CarbonSortKeyAndGroupByException(exception);
- }
- }
-
- /**
- * This class is responsible for sorting and writing the object
- * array which holds the records equal to given array size
- */
- private class DataSorterAndWriter implements Runnable {
- private Object[][] recordHolderArray;
-
- public DataSorterAndWriter(Object[][] recordHolderArray) {
- this.recordHolderArray = recordHolderArray;
- }
-
- @Override
- public void run() {
- try {
- long startTime = System.currentTimeMillis();
- if (parameters.getNumberOfNoDictSortColumns() > 0) {
- Arrays.sort(recordHolderArray,
- new NewRowComparator(parameters.getNoDictionarySortColumn()));
- } else {
- Arrays.sort(recordHolderArray,
- new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
- }
-
- // create a new file and choose folder randomly every time
- String[] tmpFileLocation = parameters.getTempFileLocation();
- String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)];
- File sortTempFile = new File(
- locationChosen + File.separator + parameters.getTableName() + System
- .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
- writeDataTofile(recordHolderArray, recordHolderArray.length, sortTempFile);
- // add sort temp filename to and arrayList. When the list size reaches 20 then
- // intermediate merging of sort temp files will be triggered
- intermediateFileMerger.addFileToMerge(sortTempFile);
- LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + (
- System.currentTimeMillis() - startTime));
- } catch (Throwable e) {
- try {
- threadStatusObserver.notifyFailed(e);
- } catch (CarbonSortKeyAndGroupByException ex) {
- LOGGER.error(ex);
- }
- } finally {
- semaphore.release();
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
deleted file mode 100644
index 6bda88a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-/**
- * It does mergesort intermediate files to big file.
- */
-public class SortIntermediateFileMerger {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(SortIntermediateFileMerger.class.getName());
-
- /**
- * executorService
- */
- private ExecutorService executorService;
- /**
- * procFiles
- */
- private List<File> procFiles;
-
- private SortParameters parameters;
-
- private final Object lockObject = new Object();
-
- public SortIntermediateFileMerger(SortParameters parameters) {
- this.parameters = parameters;
- // processed file list
- this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
- }
-
- public void addFileToMerge(File sortTempFile) {
- // add sort temp filename to and arrayList. When the list size reaches 20 then
- // intermediate merging of sort temp files will be triggered
- synchronized (lockObject) {
- procFiles.add(sortTempFile);
- }
- }
-
- public void startMergingIfPossible() {
- File[] fileList;
- if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
- synchronized (lockObject) {
- fileList = procFiles.toArray(new File[procFiles.size()]);
- this.procFiles = new ArrayList<File>();
- }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
- }
- startIntermediateMerging(fileList);
- }
- }
-
- /**
- * Below method will be used to start the intermediate file merging
- *
- * @param intermediateFiles
- */
- private void startIntermediateMerging(File[] intermediateFiles) {
- int index = new Random().nextInt(parameters.getTempFileLocation().length);
- String chosenTempDir = parameters.getTempFileLocation()[index];
- File file = new File(
- chosenTempDir + File.separator + parameters.getTableName() + System
- .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
- IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
- executorService.execute(merger);
- }
-
- public void finish() throws CarbonSortKeyAndGroupByException {
- try {
- executorService.shutdown();
- executorService.awaitTermination(2, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
- }
- procFiles.clear();
- procFiles = null;
- }
-
- public void close() {
- if (executorService.isShutdown()) {
- executorService.shutdownNow();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
deleted file mode 100644
index fb2977e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ /dev/null
@@ -1,603 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.File;
-import java.io.Serializable;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.schema.metadata.SortObserver;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.apache.commons.lang3.StringUtils;
-
-public class SortParameters implements Serializable {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(SortParameters.class.getName());
- /**
- * tempFileLocation
- */
- private String[] tempFileLocation;
- /**
- * sortBufferSize
- */
- private int sortBufferSize;
- /**
- * measure count
- */
- private int measureColCount;
- /**
- * measure count
- */
- private int dimColCount;
- /**
- * measure count
- */
- private int complexDimColCount;
- /**
- * fileBufferSize
- */
- private int fileBufferSize;
- /**
- * numberOfIntermediateFileToBeMerged
- */
- private int numberOfIntermediateFileToBeMerged;
- /**
- * fileWriteBufferSize
- */
- private int fileWriteBufferSize;
- /**
- * observer
- */
- private SortObserver observer;
- /**
- * sortTempFileNoOFRecordsInCompression
- */
- private int sortTempFileNoOFRecordsInCompression;
- /**
- * isSortTempFileCompressionEnabled
- */
- private boolean isSortFileCompressionEnabled;
- /**
- * prefetch
- */
- private boolean prefetch;
- /**
- * bufferSize
- */
- private int bufferSize;
-
- private String databaseName;
-
- private String tableName;
-
- private DataType[] measureDataType;
-
- /**
- * To know how many columns are of high cardinality.
- */
- private int noDictionaryCount;
- /**
- * partitionID
- */
- private String partitionID;
- /**
- * Id of the load folder
- */
- private String segmentId;
- /**
- * task id, each spark task has a unique id
- */
- private String taskNo;
-
- /**
- * This will tell whether dimension is dictionary or not.
- */
- private boolean[] noDictionaryDimnesionColumn;
-
- private boolean[] noDictionarySortColumn;
-
- private int numberOfSortColumns;
-
- private int numberOfNoDictSortColumns;
-
- private int numberOfCores;
-
- private int batchSortSizeinMb;
-
- public SortParameters getCopy() {
- SortParameters parameters = new SortParameters();
- parameters.tempFileLocation = tempFileLocation;
- parameters.sortBufferSize = sortBufferSize;
- parameters.measureColCount = measureColCount;
- parameters.dimColCount = dimColCount;
- parameters.complexDimColCount = complexDimColCount;
- parameters.fileBufferSize = fileBufferSize;
- parameters.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
- parameters.fileWriteBufferSize = fileWriteBufferSize;
- parameters.observer = observer;
- parameters.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
- parameters.isSortFileCompressionEnabled = isSortFileCompressionEnabled;
- parameters.prefetch = prefetch;
- parameters.bufferSize = bufferSize;
- parameters.databaseName = databaseName;
- parameters.tableName = tableName;
- parameters.measureDataType = measureDataType;
- parameters.noDictionaryCount = noDictionaryCount;
- parameters.partitionID = partitionID;
- parameters.segmentId = segmentId;
- parameters.taskNo = taskNo;
- parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
- parameters.noDictionarySortColumn = noDictionarySortColumn;
- parameters.numberOfSortColumns = numberOfSortColumns;
- parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
- parameters.numberOfCores = numberOfCores;
- parameters.batchSortSizeinMb = batchSortSizeinMb;
- return parameters;
- }
-
- public String[] getTempFileLocation() {
- return tempFileLocation;
- }
-
- public void setTempFileLocation(String[] tempFileLocation) {
- this.tempFileLocation = tempFileLocation;
- }
-
- public int getSortBufferSize() {
- return sortBufferSize;
- }
-
- public void setSortBufferSize(int sortBufferSize) {
- this.sortBufferSize = sortBufferSize;
- }
-
- public int getMeasureColCount() {
- return measureColCount;
- }
-
- public void setMeasureColCount(int measureColCount) {
- this.measureColCount = measureColCount;
- }
-
- public int getDimColCount() {
- return dimColCount;
- }
-
- public void setDimColCount(int dimColCount) {
- this.dimColCount = dimColCount;
- }
-
- public int getComplexDimColCount() {
- return complexDimColCount;
- }
-
- public void setComplexDimColCount(int complexDimColCount) {
- this.complexDimColCount = complexDimColCount;
- }
-
- public int getFileBufferSize() {
- return fileBufferSize;
- }
-
- public void setFileBufferSize(int fileBufferSize) {
- this.fileBufferSize = fileBufferSize;
- }
-
- public int getNumberOfIntermediateFileToBeMerged() {
- return numberOfIntermediateFileToBeMerged;
- }
-
- public void setNumberOfIntermediateFileToBeMerged(int numberOfIntermediateFileToBeMerged) {
- this.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
- }
-
- public int getFileWriteBufferSize() {
- return fileWriteBufferSize;
- }
-
- public void setFileWriteBufferSize(int fileWriteBufferSize) {
- this.fileWriteBufferSize = fileWriteBufferSize;
- }
-
- public SortObserver getObserver() {
- return observer;
- }
-
- public void setObserver(SortObserver observer) {
- this.observer = observer;
- }
-
- public int getSortTempFileNoOFRecordsInCompression() {
- return sortTempFileNoOFRecordsInCompression;
- }
-
- public void setSortTempFileNoOFRecordsInCompression(int sortTempFileNoOFRecordsInCompression) {
- this.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
- }
-
- public boolean isSortFileCompressionEnabled() {
- return isSortFileCompressionEnabled;
- }
-
- public void setSortFileCompressionEnabled(boolean sortFileCompressionEnabled) {
- isSortFileCompressionEnabled = sortFileCompressionEnabled;
- }
-
- public boolean isPrefetch() {
- return prefetch;
- }
-
- public void setPrefetch(boolean prefetch) {
- this.prefetch = prefetch;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-
- public void setBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- }
-
- public String getDatabaseName() {
- return databaseName;
- }
-
- public void setDatabaseName(String databaseName) {
- this.databaseName = databaseName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public DataType[] getMeasureDataType() {
- return measureDataType;
- }
-
- public void setMeasureDataType(DataType[] measureDataType) {
- this.measureDataType = measureDataType;
- }
-
- public int getNoDictionaryCount() {
- return noDictionaryCount;
- }
-
- public void setNoDictionaryCount(int noDictionaryCount) {
- this.noDictionaryCount = noDictionaryCount;
- }
-
- public String getPartitionID() {
- return partitionID;
- }
-
- public void setPartitionID(String partitionID) {
- this.partitionID = partitionID;
- }
-
- public String getSegmentId() {
- return segmentId;
- }
-
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
- }
-
- public String getTaskNo() {
- return taskNo;
- }
-
- public void setTaskNo(String taskNo) {
- this.taskNo = taskNo;
- }
-
- public boolean[] getNoDictionaryDimnesionColumn() {
- return noDictionaryDimnesionColumn;
- }
-
- public void setNoDictionaryDimnesionColumn(boolean[] noDictionaryDimnesionColumn) {
- this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
- }
-
- public int getNumberOfCores() {
- return numberOfCores;
- }
-
- public void setNumberOfCores(int numberOfCores) {
- this.numberOfCores = numberOfCores;
- }
-
- public int getNumberOfSortColumns() {
- return numberOfSortColumns;
- }
-
- public void setNumberOfSortColumns(int numberOfSortColumns) {
- this.numberOfSortColumns = Math.min(numberOfSortColumns, this.dimColCount);
- }
-
- public boolean[] getNoDictionarySortColumn() {
- return noDictionarySortColumn;
- }
-
- public void setNoDictionarySortColumn(boolean[] noDictionarySortColumn) {
- this.noDictionarySortColumn = noDictionarySortColumn;
- }
-
- public int getNumberOfNoDictSortColumns() {
- return numberOfNoDictSortColumns;
- }
-
- public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
- this.numberOfNoDictSortColumns = Math.min(numberOfNoDictSortColumns, noDictionaryCount);
- }
-
- public int getBatchSortSizeinMb() {
- return batchSortSizeinMb;
- }
-
- public void setBatchSortSizeinMb(int batchSortSizeinMb) {
- this.batchSortSizeinMb = batchSortSizeinMb;
- }
-
- public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
- SortParameters parameters = new SortParameters();
- CarbonTableIdentifier tableIdentifier =
- configuration.getTableIdentifier().getCarbonTableIdentifier();
- CarbonProperties carbonProperties = CarbonProperties.getInstance();
- parameters.setDatabaseName(tableIdentifier.getDatabaseName());
- parameters.setTableName(tableIdentifier.getTableName());
- parameters.setPartitionID(configuration.getPartitionId());
- parameters.setSegmentId(configuration.getSegmentId());
- parameters.setTaskNo(configuration.getTaskNo());
- parameters.setMeasureColCount(configuration.getMeasureCount());
- parameters.setDimColCount(
- configuration.getDimensionCount() - configuration.getComplexColumnCount());
- parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
- parameters.setComplexDimColCount(configuration.getComplexColumnCount());
- parameters.setNoDictionaryDimnesionColumn(
- CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
- parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
-
- parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
- parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
- setNoDictionarySortColumnMapping(parameters);
- parameters.setObserver(new SortObserver());
- // get sort buffer size
- parameters.setSortBufferSize(Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.SORT_SIZE,
- CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
- LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
- // set number of intermedaite file to merge
- parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
- CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
-
- LOGGER.info("Number of intermediate file to be merged: " + parameters
- .getNumberOfIntermediateFileToBeMerged());
-
- // get file buffer size
- parameters.setFileBufferSize(CarbonDataProcessorUtil
- .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
- CarbonCommonConstants.CONSTANT_SIZE_TEN));
-
- LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
-
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
- tableIdentifier.getTableName(), configuration.getTaskNo(),
- configuration.getPartitionId(), configuration.getSegmentId(), false, false);
- String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
- File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-
- parameters.setTempFileLocation(sortTempDirs);
- LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
-
- int numberOfCores;
- try {
- numberOfCores = Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
- numberOfCores = numberOfCores / 2;
- } catch (NumberFormatException exc) {
- numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- }
- parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
-
- parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
- CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
-
- parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
- .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
- CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
-
- int sortTempFileNoOFRecordsInCompression;
- try {
- sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
- CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
- if (sortTempFileNoOFRecordsInCompression < 1) {
- LOGGER.error("Invalid value for: "
- + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
- + ":Only Positive Integer value(greater than zero) is allowed.Default value will "
- + "be used");
-
- sortTempFileNoOFRecordsInCompression = Integer.parseInt(
- CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
- }
- } catch (NumberFormatException e) {
- LOGGER.error(
- "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
- + ", only Positive Integer value is allowed. Default value will be used");
-
- sortTempFileNoOFRecordsInCompression = Integer
- .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
- }
- parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
-
- if (parameters.isSortFileCompressionEnabled()) {
- LOGGER.info("Compression will be used for writing the sort temp File");
- }
-
- parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
- parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
- CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
- CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
-
- DataType[] measureDataType = configuration.getMeasureDataType();
- parameters.setMeasureDataType(measureDataType);
- return parameters;
- }
-
- /**
- * this method will set the boolean mapping for no dictionary sort columns
- *
- * @param parameters
- */
- private static void setNoDictionarySortColumnMapping(SortParameters parameters) {
- if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length) {
- parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
- } else {
- boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()];
- System
- .arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0, noDictionarySortColumnTemp, 0,
- parameters.getNumberOfSortColumns());
- parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
- }
- }
-
- public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
- String tableName, int dimColCount, int complexDimColCount, int measureColCount,
- int noDictionaryCount, String partitionID, String segmentId, String taskNo,
- boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
- SortParameters parameters = new SortParameters();
- CarbonProperties carbonProperties = CarbonProperties.getInstance();
- parameters.setDatabaseName(databaseName);
- parameters.setTableName(tableName);
- parameters.setPartitionID(partitionID);
- parameters.setSegmentId(segmentId);
- parameters.setTaskNo(taskNo);
- parameters.setMeasureColCount(measureColCount);
- parameters.setDimColCount(dimColCount - complexDimColCount);
- parameters.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
- parameters.setNoDictionaryCount(noDictionaryCount);
- parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
- parameters.setComplexDimColCount(complexDimColCount);
- parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
- parameters.setObserver(new SortObserver());
- // get sort buffer size
- parameters.setSortBufferSize(Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.SORT_SIZE,
- CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
- LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
- // set number of intermedaite file to merge
- parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
- CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
-
- LOGGER.info("Number of intermediate file to be merged: " + parameters
- .getNumberOfIntermediateFileToBeMerged());
-
- // get file buffer size
- parameters.setFileBufferSize(CarbonDataProcessorUtil
- .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
- CarbonCommonConstants.CONSTANT_SIZE_TEN));
-
- LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
-
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId,
- isCompactionFlow, false);
- String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
- File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
- parameters.setTempFileLocation(sortTempDirs);
- LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
-
- int numberOfCores;
- try {
- numberOfCores = Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
- numberOfCores = numberOfCores / 2;
- } catch (NumberFormatException exc) {
- numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- }
- parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
-
- parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
- CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
-
- parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
- .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
- CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
-
- int sortTempFileNoOFRecordsInCompression;
- try {
- sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
- CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
- if (sortTempFileNoOFRecordsInCompression < 1) {
- LOGGER.error("Invalid value for: "
- + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
- + ":Only Positive Integer value(greater than zero) is allowed.Default value will "
- + "be used");
-
- sortTempFileNoOFRecordsInCompression = Integer.parseInt(
- CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
- }
- } catch (NumberFormatException e) {
- LOGGER.error(
- "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
- + ", only Positive Integer value is allowed. Default value will be used");
-
- sortTempFileNoOFRecordsInCompression = Integer
- .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
- }
- parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
-
- if (parameters.isSortFileCompressionEnabled()) {
- LOGGER.info("Compression will be used for writing the sort temp File");
- }
-
- parameters.setPrefetch(CarbonCommonConstants. CARBON_PREFETCH_IN_MERGE_VALUE);
- parameters.setBufferSize(Integer.parseInt(carbonProperties.getProperty(
- CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
- CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
-
- DataType[] type = CarbonDataProcessorUtil
- .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
- parameters.getTableName());
- parameters.setMeasureDataType(type);
- setNoDictionarySortColumnMapping(parameters);
- return parameters;
- }
-
-}