You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:24 UTC
[08/20] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
deleted file mode 100644
index 20d9894..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
-
-public class UnsafeInmemoryHolder implements SortTempChunkHolder {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeInmemoryHolder.class.getName());
-
- private int counter;
-
- private int actualSize;
-
- private UnsafeCarbonRowPage rowPage;
-
- private Object[] currentRow;
-
- private long address;
-
- private NewRowComparator comparator;
-
- private int columnSize;
-
- public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
- int numberOfSortColumns) {
- this.actualSize = rowPage.getBuffer().getActualSize();
- this.rowPage = rowPage;
- LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
- this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
- this.columnSize = columnSize;
- }
-
- public boolean hasNext() {
- if (counter < actualSize) {
- return true;
- }
- return false;
- }
-
- public void readRow() {
- currentRow = new Object[columnSize];
- address = rowPage.getBuffer().get(counter);
- rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow);
- counter++;
- }
-
- public Object[] getRow() {
- return currentRow;
- }
-
- @Override public int compareTo(SortTempChunkHolder o) {
- return comparator.compare(currentRow, o.getRow());
- }
-
- @Override public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof UnsafeInmemoryHolder)) {
- return false;
- }
-
- UnsafeInmemoryHolder o = (UnsafeInmemoryHolder)obj;
-
- return this == o;
- }
-
- @Override public int hashCode() {
- return super.hashCode();
- }
-
- public int numberOfRows() {
- return actualSize;
- }
-
- public void close() {
- rowPage.freeMemory();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
deleted file mode 100644
index fa4534f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
-
-/**
- * It is used for merging unsafe inmemory intermediate data
- */
-public class UnsafeInmemoryMergeHolder implements Comparable<UnsafeInmemoryMergeHolder> {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeInmemoryMergeHolder.class.getName());
-
- private int counter;
-
- private int actualSize;
-
- private UnsafeCarbonRowPage rowPage;
-
- private UnsafeCarbonRowForMerge currentRow;
-
- private long address;
-
- private UnsafeRowComparator comparator;
-
- private Object baseObject;
-
- private byte index;
-
- public UnsafeInmemoryMergeHolder(UnsafeCarbonRowPage rowPage, byte index) {
- this.actualSize = rowPage.getBuffer().getActualSize();
- this.rowPage = rowPage;
- LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
- this.comparator = new UnsafeRowComparator(rowPage);
- this.baseObject = rowPage.getDataBlock().getBaseObject();
- currentRow = new UnsafeCarbonRowForMerge();
- this.index = index;
- }
-
- public boolean hasNext() {
- if (counter < actualSize) {
- return true;
- }
- return false;
- }
-
- public void readRow() {
- address = rowPage.getBuffer().get(counter);
- currentRow = new UnsafeCarbonRowForMerge();
- currentRow.address = address + rowPage.getDataBlock().getBaseOffset();
- currentRow.index = index;
- counter++;
- }
-
- public UnsafeCarbonRowForMerge getRow() {
- return currentRow;
- }
-
- @Override public int compareTo(UnsafeInmemoryMergeHolder o) {
- return comparator.compare(currentRow, baseObject, o.getRow(), o.getBaseObject());
- }
-
- @Override public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof UnsafeInmemoryMergeHolder)) {
- return false;
- }
-
- UnsafeInmemoryMergeHolder o = (UnsafeInmemoryMergeHolder)obj;
- return this == o;
- }
-
- @Override public int hashCode() {
- return super.hashCode();
- }
-
- public Object getBaseObject() {
- return baseObject;
- }
-
- public void close() {
- rowPage.freeMemory();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
deleted file mode 100644
index f5316e6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
-
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeSortTempFileChunkHolder.class.getName());
-
- /**
- * temp file
- */
- private File tempFile;
-
- /**
- * read stream
- */
- private DataInputStream stream;
-
- /**
- * entry count
- */
- private int entryCount;
-
- /**
- * return row
- */
- private Object[] returnRow;
-
- /**
- * number of measures
- */
- private int measureCount;
-
- /**
- * number of dimensionCount
- */
- private int dimensionCount;
-
- /**
- * number of complexDimensionCount
- */
- private int complexDimensionCount;
-
- /**
- * fileBufferSize for file reader stream size
- */
- private int fileBufferSize;
-
- private Object[][] currentBuffer;
-
- private Object[][] backupBuffer;
-
- private boolean isBackupFilled;
-
- private boolean prefetch;
-
- private int bufferSize;
-
- private int bufferRowCounter;
-
- private ExecutorService executorService;
-
- private Future<Void> submit;
-
- private int prefetchRecordsProceesed;
-
- /**
- * sortTempFileNoOFRecordsInCompression
- */
- private int sortTempFileNoOFRecordsInCompression;
-
- /**
- * isSortTempFileCompressionEnabled
- */
- private boolean isSortTempFileCompressionEnabled;
-
- /**
- * totalRecordFetch
- */
- private int totalRecordFetch;
-
- private int noDictionaryCount;
-
- private DataType[] measureDataType;
-
- private int numberOfObjectRead;
- /**
- * to store whether dimension is of dictionary type or not
- */
- private boolean[] isNoDictionaryDimensionColumn;
-
- private int nullSetWordsLength;
-
- private Comparator<Object[]> comparator;
-
- /**
- * Constructor to initialize
- */
- public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
- // set temp file
- this.tempFile = tempFile;
-
- // set measure and dimension count
- this.measureCount = parameters.getMeasureColCount();
- this.dimensionCount = parameters.getDimColCount();
- this.complexDimensionCount = parameters.getComplexDimColCount();
-
- this.noDictionaryCount = parameters.getNoDictionaryCount();
- // set mdkey length
- this.fileBufferSize = parameters.getFileBufferSize();
- this.executorService = Executors.newFixedThreadPool(1);
- this.measureDataType = parameters.getMeasureDataType();
- this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
- this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
- comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
- initialize();
- }
-
- /**
- * This method will be used to initialize
- *
- * @throws CarbonSortKeyAndGroupByException problem while initializing
- */
- public void initialize() {
- prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
- CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
- bufferSize = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
- CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
- this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
- CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
- if (this.isSortTempFileCompressionEnabled) {
- LOGGER.info("Compression was used while writing the sortTempFile");
- }
-
- try {
- this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
- CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
- if (this.sortTempFileNoOFRecordsInCompression < 1) {
- LOGGER.error("Invalid value for: "
- + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
- + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
- + " be used");
-
- this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
- CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
- }
- } catch (NumberFormatException e) {
- LOGGER.error(
- "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
- + ", only Positive Integer value is allowed.Default value will be used");
- this.sortTempFileNoOFRecordsInCompression = Integer
- .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
- }
-
- initialise();
- }
-
- private void initialise() {
- try {
- if (isSortTempFileCompressionEnabled) {
- this.bufferSize = sortTempFileNoOFRecordsInCompression;
- }
- stream = new DataInputStream(
- new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
- this.entryCount = stream.readInt();
- LOGGER.audit("Processing unsafe mode file rows with size : " + entryCount);
- if (prefetch) {
- new DataFetcher(false).call();
- totalRecordFetch += currentBuffer.length;
- if (totalRecordFetch < this.entryCount) {
- submit = executorService.submit(new DataFetcher(true));
- }
- } else {
- if (isSortTempFileCompressionEnabled) {
- new DataFetcher(false).call();
- }
- }
-
- } catch (FileNotFoundException e) {
- LOGGER.error(e);
- throw new RuntimeException(tempFile + " No Found", e);
- } catch (IOException e) {
- LOGGER.error(e);
- throw new RuntimeException(tempFile + " No Found", e);
- } catch (Exception e) {
- LOGGER.error(e);
- throw new RuntimeException(tempFile + " Problem while reading", e);
- }
- }
-
- /**
- * This method will be used to read new row from file
- *
- * @throws CarbonSortKeyAndGroupByException problem while reading
- */
- public void readRow() throws CarbonSortKeyAndGroupByException {
- if (prefetch) {
- fillDataForPrefetch();
- } else if (isSortTempFileCompressionEnabled) {
- if (bufferRowCounter >= bufferSize) {
- try {
- new DataFetcher(false).call();
- bufferRowCounter = 0;
- } catch (Exception e) {
- LOGGER.error(e);
- throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
- }
-
- }
- prefetchRecordsProceesed++;
- returnRow = currentBuffer[bufferRowCounter++];
- } else {
- this.returnRow = getRowFromStream();
- }
- }
-
- private void fillDataForPrefetch() {
- if (bufferRowCounter >= bufferSize) {
- if (isBackupFilled) {
- bufferRowCounter = 0;
- currentBuffer = backupBuffer;
- totalRecordFetch += currentBuffer.length;
- isBackupFilled = false;
- if (totalRecordFetch < this.entryCount) {
- submit = executorService.submit(new DataFetcher(true));
- }
- } else {
- try {
- submit.get();
- } catch (Exception e) {
- LOGGER.error(e);
- }
- bufferRowCounter = 0;
- currentBuffer = backupBuffer;
- isBackupFilled = false;
- totalRecordFetch += currentBuffer.length;
- if (totalRecordFetch < this.entryCount) {
- submit = executorService.submit(new DataFetcher(true));
- }
- }
- }
- prefetchRecordsProceesed++;
- returnRow = currentBuffer[bufferRowCounter++];
- }
-
- /**
- * @return
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
- Object[] row = new Object[dimensionCount + measureCount];
- try {
- int dimCount = 0;
- for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
- if (isNoDictionaryDimensionColumn[dimCount]) {
- short aShort = stream.readShort();
- byte[] col = new byte[aShort];
- stream.readFully(col);
- row[dimCount] = col;
- } else {
- int anInt = stream.readInt();
- row[dimCount] = anInt;
- }
- }
-
- // write complex dimensions here.
- for (; dimCount < dimensionCount; dimCount++) {
- short aShort = stream.readShort();
- byte[] col = new byte[aShort];
- stream.readFully(col);
- row[dimCount] = col;
- }
-
- long[] words = new long[nullSetWordsLength];
- for (int i = 0; i < words.length; i++) {
- words[i] = stream.readLong();
- }
-
- for (int mesCount = 0; mesCount < measureCount; mesCount++) {
- if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
- switch (measureDataType[mesCount]) {
- case SHORT:
- row[dimensionCount + mesCount] = stream.readShort();
- break;
- case INT:
- row[dimensionCount + mesCount] = stream.readInt();
- break;
- case LONG:
- row[dimensionCount + mesCount] = stream.readLong();
- break;
- case DOUBLE:
- row[dimensionCount + mesCount] = stream.readDouble();
- break;
- case DECIMAL:
- short aShort = stream.readShort();
- byte[] bigDecimalInBytes = new byte[aShort];
- stream.readFully(bigDecimalInBytes);
- row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
- break;
- default:
- throw new IllegalArgumentException("unsupported data type:" +
- measureDataType[mesCount]);
- }
- }
- }
- return row;
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException(e);
- }
- }
-
- /**
- * below method will be used to get the row
- *
- * @return row
- */
- public Object[] getRow() {
- return this.returnRow;
- }
-
- /**
- * below method will be used to check whether any more records are present
- * in file or not
- *
- * @return more row present in file
- */
- public boolean hasNext() {
- if (prefetch || isSortTempFileCompressionEnabled) {
- return this.prefetchRecordsProceesed < this.entryCount;
- }
- return this.numberOfObjectRead < this.entryCount;
- }
-
- /**
- * Below method will be used to close streams
- */
- public void close() {
- CarbonUtil.closeStreams(stream);
- executorService.shutdown();
- }
-
- /**
- * This method will number of entries
- *
- * @return entryCount
- */
- public int numberOfRows() {
- return entryCount;
- }
-
- @Override public int compareTo(SortTempChunkHolder other) {
- return comparator.compare(returnRow, other.getRow());
- }
-
- @Override public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof UnsafeSortTempFileChunkHolder)) {
- return false;
- }
- UnsafeSortTempFileChunkHolder o = (UnsafeSortTempFileChunkHolder) obj;
-
- return this == o;
- }
-
- @Override public int hashCode() {
- int hash = 0;
- hash += 31 * measureCount;
- hash += 31 * dimensionCount;
- hash += 31 * complexDimensionCount;
- hash += 31 * noDictionaryCount;
- hash += tempFile.hashCode();
- return hash;
- }
-
- private final class DataFetcher implements Callable<Void> {
- private boolean isBackUpFilling;
-
- private int numberOfRecords;
-
- private DataFetcher(boolean backUp) {
- isBackUpFilling = backUp;
- calculateNumberOfRecordsToBeFetched();
- }
-
- private void calculateNumberOfRecordsToBeFetched() {
- int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
- numberOfRecords =
- bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
- }
-
- @Override public Void call() throws Exception {
- try {
- if (isBackUpFilling) {
- backupBuffer = prefetchRecordsFromFile(numberOfRecords);
- isBackupFilled = true;
- } else {
- currentBuffer = prefetchRecordsFromFile(numberOfRecords);
- }
- } catch (Exception e) {
- LOGGER.error(e);
- }
- return null;
- }
-
- }
-
- /**
- * This method will read the records from sort temp file and keep it in a buffer
- *
- * @param numberOfRecords
- * @return
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[][] prefetchRecordsFromFile(int numberOfRecords)
- throws CarbonSortKeyAndGroupByException {
- Object[][] records = new Object[numberOfRecords][];
- for (int i = 0; i < numberOfRecords; i++) {
- records[i] = getRowFromStream();
- }
- return records;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
deleted file mode 100644
index 5480838..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
-
-import java.util.AbstractQueue;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRowForMerge;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryMergeHolder;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class UnsafeInMemoryIntermediateDataMerger implements Runnable {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeInMemoryIntermediateDataMerger.class.getName());
-
- /**
- * recordHolderHeap
- */
- private AbstractQueue<UnsafeInmemoryMergeHolder> recordHolderHeap;
-
- /**
- * fileCounter
- */
- private int holderCounter;
-
- /**
- * entryCount
- */
- private int entryCount;
-
- private UnsafeCarbonRowPage[] unsafeCarbonRowPages;
-
- private long[] mergedAddresses;
-
- private byte[] rowPageIndexes;
-
- /**
- * IntermediateFileMerger Constructor
- */
- public UnsafeInMemoryIntermediateDataMerger(UnsafeCarbonRowPage[] unsafeCarbonRowPages,
- int totalSize) {
- this.holderCounter = unsafeCarbonRowPages.length;
- this.unsafeCarbonRowPages = unsafeCarbonRowPages;
- this.mergedAddresses = new long[totalSize];
- this.rowPageIndexes = new byte[totalSize];
- this.entryCount = 0;
- }
-
- @Override
- public void run() {
- long intermediateMergeStartTime = System.currentTimeMillis();
- int holderCounterConst = holderCounter;
- try {
- startSorting();
- while (hasNext()) {
- writeDataToMemory(next());
- }
- double intermediateMergeCostTime =
- (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
- LOGGER.info("============================== Intermediate Merge of " + holderCounterConst
- + " in-memory sort Cost Time: " + intermediateMergeCostTime + "(s)");
- } catch (Exception e) {
- LOGGER.error(e, "Problem while intermediate merging");
- }
- }
-
- /**
- * This method will be used to get the sorted record from file
- *
- * @return sorted record sorted record
- * @throws CarbonSortKeyAndGroupByException
- */
- private UnsafeCarbonRowForMerge getSortedRecordFromMemory()
- throws CarbonSortKeyAndGroupByException {
- UnsafeCarbonRowForMerge row = null;
-
- // poll the top object from heap
- // heap maintains binary tree which is based on heap condition that will
- // be based on comparator we are passing the heap
- // when will call poll it will always delete root of the tree and then
- // it does trickel down operation complexity is log(n)
- UnsafeInmemoryMergeHolder poll = this.recordHolderHeap.poll();
-
- // get the row from chunk
- row = poll.getRow();
-
- // check if there no entry present
- if (!poll.hasNext()) {
- // change the file counter
- --this.holderCounter;
-
- // reaturn row
- return row;
- }
-
- // read new row
- poll.readRow();
-
- // add to heap
- this.recordHolderHeap.add(poll);
-
- // return row
- return row;
- }
-
- /**
- * Below method will be used to start storing process This method will get
- * all the temp files present in sort temp folder then it will create the
- * record holder heap and then it will read first record from each file and
- * initialize the heap
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- private void startSorting() throws CarbonSortKeyAndGroupByException {
- LOGGER.info("Number of row pages in intermediate merger: " + this.holderCounter);
-
- // create record holder heap
- createRecordHolderQueue(unsafeCarbonRowPages);
-
- // iterate over file list and create chunk holder and add to heap
- LOGGER.info("Started adding first record from row page");
-
- UnsafeInmemoryMergeHolder unsafePageHolder = null;
- byte index = 0;
- for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
- // create chunk holder
- unsafePageHolder = new UnsafeInmemoryMergeHolder(unsafeCarbonRowPage, index++);
-
- // initialize
- unsafePageHolder.readRow();
-
- // add to heap
- this.recordHolderHeap.add(unsafePageHolder);
- }
-
- LOGGER.info("Heap Size" + this.recordHolderHeap.size());
- }
-
- /**
- * This method will be used to create the heap which will be used to hold
- * the chunk of data
- */
- private void createRecordHolderQueue(UnsafeCarbonRowPage[] pages) {
- // creating record holder heap
- this.recordHolderHeap = new PriorityQueue<UnsafeInmemoryMergeHolder>(pages.length);
- }
-
- /**
- * This method will be used to get the sorted row
- *
- * @return sorted row
- * @throws CarbonSortKeyAndGroupByException
- */
- private UnsafeCarbonRowForMerge next() throws CarbonSortKeyAndGroupByException {
- return getSortedRecordFromMemory();
- }
-
- /**
- * This method will be used to check whether any more element is present or
- * not
- *
- * @return more element is present
- */
- private boolean hasNext() {
- return this.holderCounter > 0;
- }
-
- /**
- * Below method will be used to write data to file
- */
- private void writeDataToMemory(UnsafeCarbonRowForMerge row) {
- mergedAddresses[entryCount] = row.address;
- rowPageIndexes[entryCount] = row.index;
- entryCount++;
- }
-
- public int getEntryCount() {
- return entryCount;
- }
-
- public UnsafeCarbonRowPage[] getUnsafeCarbonRowPages() {
- return unsafeCarbonRowPages;
- }
-
- public long[] getMergedAddresses() {
- return mergedAddresses;
- }
-
- public byte[] getRowPageIndexes() {
- return rowPageIndexes;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
deleted file mode 100644
index 63f6aab..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.AbstractQueue;
-import java.util.Arrays;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriter;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriterFactory;
-
-public class UnsafeIntermediateFileMerger implements Runnable {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeIntermediateFileMerger.class.getName());
-
- /**
- * recordHolderHeap
- */
- private AbstractQueue<SortTempChunkHolder> recordHolderHeap;
-
- /**
- * fileCounter
- */
- private int fileCounter;
-
- /**
- * stream
- */
- private DataOutputStream stream;
-
- /**
- * totalNumberOfRecords
- */
- private int totalNumberOfRecords;
-
- /**
- * writer
- */
- private TempSortFileWriter writer;
-
- private SortParameters mergerParameters;
-
- private File[] intermediateFiles;
-
- private File outPutFile;
-
- private boolean[] noDictionarycolumnMapping;
-
- private long[] nullSetWords;
-
- private ByteBuffer rowData;
-
- /**
- * IntermediateFileMerger Constructor
- */
- public UnsafeIntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles,
- File outPutFile) {
- this.mergerParameters = mergerParameters;
- this.fileCounter = intermediateFiles.length;
- this.intermediateFiles = intermediateFiles;
- this.outPutFile = outPutFile;
- noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
- this.nullSetWords = new long[((mergerParameters.getMeasureColCount() - 1) >> 6) + 1];
- // Take size of 2 MB for each row. I think it is high enough to use
- rowData = ByteBuffer.allocate(2 * 1024 * 1024);
- }
-
- @Override
- public void run() {
- long intermediateMergeStartTime = System.currentTimeMillis();
- int fileConterConst = fileCounter;
- boolean isFailed = false;
- try {
- startSorting();
- initialize();
- while (hasNext()) {
- writeDataTofile(next());
- }
- double intermediateMergeCostTime =
- (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
- LOGGER.info("============================== Intermediate Merge of " + fileConterConst
- + " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
- } catch (Exception e) {
- LOGGER.error(e, "Problem while intermediate merging");
- isFailed = true;
- } finally {
- CarbonUtil.closeStreams(this.stream);
- if (null != writer) {
- writer.finish();
- }
- if (!isFailed) {
- try {
- finish();
- } catch (CarbonSortKeyAndGroupByException e) {
- LOGGER.error(e, "Problem while deleting the merge file");
- }
- } else {
- if (outPutFile.delete()) {
- LOGGER.error("Problem while deleting the merge file");
- }
- }
- }
- }
-
- /**
- * This method is responsible for initializing the out stream
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- private void initialize() throws CarbonSortKeyAndGroupByException {
- if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
- try {
- this.stream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(outPutFile),
- mergerParameters.getFileWriteBufferSize()));
- this.stream.writeInt(this.totalNumberOfRecords);
- } catch (FileNotFoundException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
- }
- } else {
- writer = TempSortFileWriterFactory.getInstance()
- .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
- mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
- mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
- mergerParameters.getFileWriteBufferSize());
- writer.initiaize(outPutFile, totalNumberOfRecords);
- }
- }
-
- /**
- * This method will be used to get the sorted record from file
- *
- * @return sorted record sorted record
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
- Object[] row = null;
-
- // poll the top object from heap
- // heap maintains binary tree which is based on heap condition that will
- // be based on comparator we are passing the heap
- // when will call poll it will always delete root of the tree and then
- // it does trickel down operation complexity is log(n)
- SortTempChunkHolder poll = this.recordHolderHeap.poll();
-
- // get the row from chunk
- row = poll.getRow();
-
- // check if there no entry present
- if (!poll.hasNext()) {
- // if chunk is empty then close the stream
- poll.close();
-
- // change the file counter
- --this.fileCounter;
-
- // reaturn row
- return row;
- }
-
- // read new row
- poll.readRow();
-
- // add to heap
- this.recordHolderHeap.add(poll);
-
- // return row
- return row;
- }
-
- /**
- * Below method will be used to start storing process This method will get
- * all the temp files present in sort temp folder then it will create the
- * record holder heap and then it will read first record from each file and
- * initialize the heap
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- private void startSorting() throws CarbonSortKeyAndGroupByException {
- LOGGER.info("Number of temp file: " + this.fileCounter);
-
- // create record holder heap
- createRecordHolderQueue(intermediateFiles);
-
- // iterate over file list and create chunk holder and add to heap
- LOGGER.info("Started adding first record from each file");
-
- SortTempChunkHolder sortTempFileChunkHolder = null;
-
- for (File tempFile : intermediateFiles) {
- // create chunk holder
- sortTempFileChunkHolder = new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters);
-
- sortTempFileChunkHolder.readRow();
- this.totalNumberOfRecords += sortTempFileChunkHolder.numberOfRows();
-
- // add to heap
- this.recordHolderHeap.add(sortTempFileChunkHolder);
- }
-
- LOGGER.info("Heap Size" + this.recordHolderHeap.size());
- }
-
- /**
- * This method will be used to create the heap which will be used to hold
- * the chunk of data
- *
- * @param listFiles list of temp files
- */
- private void createRecordHolderQueue(File[] listFiles) {
- // creating record holder heap
- this.recordHolderHeap = new PriorityQueue<SortTempChunkHolder>(listFiles.length);
- }
-
- /**
- * This method will be used to get the sorted row
- *
- * @return sorted row
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[] next() throws CarbonSortKeyAndGroupByException {
- return getSortedRecordFromFile();
- }
-
- /**
- * This method will be used to check whether any more element is present or
- * not
- *
- * @return more element is present
- */
- private boolean hasNext() {
- return this.fileCounter > 0;
- }
-
- /**
- * Below method will be used to write data to file
- *
- * @throws CarbonSortKeyAndGroupByException problem while writing
- */
- private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
- int dimCount = 0;
- int size = 0;
- DataType[] type = mergerParameters.getMeasureDataType();
- for (; dimCount < noDictionarycolumnMapping.length; dimCount++) {
- if (noDictionarycolumnMapping[dimCount]) {
- byte[] col = (byte[]) row[dimCount];
- rowData.putShort((short) col.length);
- size += 2;
- rowData.put(col);
- size += col.length;
- } else {
- rowData.putInt((int) row[dimCount]);
- size += 4;
- }
- }
-
- // write complex dimensions here.
- int dimensionSize =
- mergerParameters.getDimColCount() + mergerParameters.getComplexDimColCount();
- int measureSize = mergerParameters.getMeasureColCount();
- for (; dimCount < dimensionSize; dimCount++) {
- byte[] col = (byte[]) row[dimCount];
- rowData.putShort((short)col.length);
- size += 2;
- rowData.put(col);
- size += col.length;
- }
- Arrays.fill(nullSetWords, 0);
- int nullSetSize = nullSetWords.length * 8;
- int nullLoc = size;
- size += nullSetSize;
- for (int mesCount = 0; mesCount < measureSize; mesCount++) {
- Object value = row[mesCount + dimensionSize];
- if (null != value) {
- switch (type[mesCount]) {
- case SHORT:
- rowData.putShort(size, (Short) value);
- size += 2;
- break;
- case INT:
- rowData.putInt(size, (Integer) value);
- size += 4;
- break;
- case LONG:
- rowData.putLong(size, (Long) value);
- size += 8;
- break;
- case DOUBLE:
- rowData.putDouble(size, (Double) value);
- size += 8;
- break;
- case DECIMAL:
- byte[] bigDecimalInBytes = (byte[]) value;
- rowData.putShort(size, (short)bigDecimalInBytes.length);
- size += 2;
- for (int i = 0; i < bigDecimalInBytes.length; i++) {
- rowData.put(size++, bigDecimalInBytes[i]);
- }
- break;
- }
- UnsafeCarbonRowPage.set(nullSetWords, mesCount);
- } else {
- UnsafeCarbonRowPage.unset(nullSetWords, mesCount);
- }
- }
- for (int i = 0; i < nullSetWords.length; i++) {
- rowData.putLong(nullLoc, nullSetWords[i]);
- nullLoc += 8;
- }
- byte[] rowBytes = new byte[size];
- rowData.position(0);
- rowData.get(rowBytes);
- stream.write(rowBytes);
- rowData.clear();
- }
-
- private void finish() throws CarbonSortKeyAndGroupByException {
- if (recordHolderHeap != null) {
- int size = recordHolderHeap.size();
- for (int i = 0; i < size; i++) {
- recordHolderHeap.poll().close();
- }
- }
- try {
- CarbonUtil.deleteFiles(intermediateFiles);
- rowData.clear();
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
deleted file mode 100644
index 49791e8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-/**
- * It does mergesort intermediate files to big file.
- */
-public class UnsafeIntermediateMerger {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeIntermediateMerger.class.getName());
-
- /**
- * executorService
- */
- private ExecutorService executorService;
- /**
- * rowPages
- */
- private List<UnsafeCarbonRowPage> rowPages;
-
- private List<UnsafeInMemoryIntermediateDataMerger> mergedPages;
-
- private SortParameters parameters;
-
- private final Object lockObject = new Object();
-
- private boolean offHeap;
-
- private List<File> procFiles;
-
- public UnsafeIntermediateMerger(SortParameters parameters) {
- this.parameters = parameters;
- // processed file list
- this.rowPages = new ArrayList<UnsafeCarbonRowPage>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- this.mergedPages = new ArrayList<>();
- this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
- this.offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
- CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
- this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- }
-
- public void addDataChunkToMerge(UnsafeCarbonRowPage rowPage) {
- // add sort temp filename to and arrayList. When the list size reaches 20 then
- // intermediate merging of sort temp files will be triggered
- synchronized (lockObject) {
- rowPages.add(rowPage);
- }
- }
-
- public void addFileToMerge(File sortTempFile) {
- // add sort temp filename to and arrayList. When the list size reaches 20 then
- // intermediate merging of sort temp files will be triggered
- synchronized (lockObject) {
- procFiles.add(sortTempFile);
- }
- }
-
- public void startFileMergingIfPossible() {
- File[] fileList;
- if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
- synchronized (lockObject) {
- fileList = procFiles.toArray(new File[procFiles.size()]);
- this.procFiles = new ArrayList<File>();
- }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
- }
- startIntermediateMerging(fileList);
- }
- }
-
- /**
- * Below method will be used to start the intermediate file merging
- *
- * @param intermediateFiles
- */
- private void startIntermediateMerging(File[] intermediateFiles) {
- //pick a temp location randomly
- String[] tempFileLocations = parameters.getTempFileLocation();
- String targetLocation = tempFileLocations[new Random().nextInt(tempFileLocations.length)];
-
- File file = new File(
- targetLocation + File.separator + parameters.getTableName() + System
- .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
- UnsafeIntermediateFileMerger merger =
- new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file);
- executorService.execute(merger);
- }
-
- public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException {
- UnsafeCarbonRowPage[] localRowPages;
- if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
- int totalRows = 0;
- synchronized (lockObject) {
- totalRows = getTotalNumberOfRows(rowPages);
- if (totalRows <= 0) {
- return;
- }
- localRowPages = rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]);
- this.rowPages = new ArrayList<>();
- }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sumitting request for intermediate merging of in-memory pages : "
- + localRowPages.length);
- }
- startIntermediateMerging(localRowPages, totalRows);
- }
- }
-
- /**
- * Below method will be used to start the intermediate file merging
- *
- * @param rowPages
- */
- private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int totalRows)
- throws CarbonSortKeyAndGroupByException {
- UnsafeInMemoryIntermediateDataMerger merger =
- new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows);
- mergedPages.add(merger);
- executorService.execute(merger);
- }
-
- private int getTotalNumberOfRows(List<UnsafeCarbonRowPage> unsafeCarbonRowPages) {
- int totalSize = 0;
- for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
- totalSize += unsafeCarbonRowPage.getBuffer().getActualSize();
- }
- return totalSize;
- }
-
- public void finish() throws CarbonSortKeyAndGroupByException {
- try {
- executorService.shutdown();
- executorService.awaitTermination(2, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
- }
- }
-
- public void close() {
- if (executorService.isShutdown()) {
- executorService.shutdownNow();
- }
- rowPages.clear();
- rowPages = null;
- }
-
- public List<UnsafeCarbonRowPage> getRowPages() {
- return rowPages;
- }
-
- public List<UnsafeInMemoryIntermediateDataMerger> getMergedPages() {
- return mergedPages;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
deleted file mode 100644
index e3bbdcb..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.newflow.sort.SortStepRowUtil;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeFinalMergePageHolder;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeSingleThreadFinalSortFilesMerger.class.getName());
-
- /**
- * fileCounter
- */
- private int fileCounter;
-
- /**
- * recordHolderHeap
- */
- private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;
-
- private SortParameters parameters;
-
- /**
- * tempFileLocation
- */
- private String[] tempFileLocation;
-
- private String tableName;
-
- private boolean isStopProcess;
-
- public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
- String[] tempFileLocation) {
- this.parameters = parameters;
- this.tempFileLocation = tempFileLocation;
- this.tableName = parameters.getTableName();
- }
-
- /**
- * This method will be used to merger the merged files
- *
- */
- public void startFinalMerge(UnsafeCarbonRowPage[] rowPages,
- List<UnsafeInMemoryIntermediateDataMerger> merges) throws CarbonDataWriterException {
- startSorting(rowPages, merges);
- }
-
- /**
- * Below method will be used to start storing process This method will get
- * all the temp files present in sort temp folder then it will create the
- * record holder heap and then it will read first record from each file and
- * initialize the heap
- *
- */
- private void startSorting(UnsafeCarbonRowPage[] rowPages,
- List<UnsafeInMemoryIntermediateDataMerger> merges) throws CarbonDataWriterException {
- try {
- List<File> filesToMergeSort = getFilesToMergeSort();
- this.fileCounter = rowPages.length + filesToMergeSort.size() + merges.size();
- if (fileCounter == 0) {
- LOGGER.info("No files to merge sort");
- return;
- }
- LOGGER.info("Number of row pages: " + this.fileCounter);
-
- // create record holder heap
- createRecordHolderQueue();
-
- // iterate over file list and create chunk holder and add to heap
- LOGGER.info("Started adding first record from each page");
- for (final UnsafeCarbonRowPage rowPage : rowPages) {
-
- SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
- parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
- .getMeasureColCount(), parameters.getNumberOfSortColumns());
-
- // initialize
- sortTempFileChunkHolder.readRow();
-
- recordHolderHeapLocal.add(sortTempFileChunkHolder);
- }
-
- for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
-
- SortTempChunkHolder sortTempFileChunkHolder =
- new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(),
- parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
- .getMeasureColCount());
-
- // initialize
- sortTempFileChunkHolder.readRow();
-
- recordHolderHeapLocal.add(sortTempFileChunkHolder);
- }
-
- for (final File file : filesToMergeSort) {
-
- SortTempChunkHolder sortTempFileChunkHolder =
- new UnsafeSortTempFileChunkHolder(file, parameters);
-
- // initialize
- sortTempFileChunkHolder.readRow();
-
- recordHolderHeapLocal.add(sortTempFileChunkHolder);
- }
-
- LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
- } catch (Exception e) {
- LOGGER.error(e);
- throw new CarbonDataWriterException(e.getMessage());
- }
- }
-
- private List<File> getFilesToMergeSort() {
- FileFilter fileFilter = new FileFilter() {
- public boolean accept(File pathname) {
- return pathname.getName().startsWith(tableName);
- }
- };
-
- // get all the merged files
- List<File> files = new ArrayList<File>(tempFileLocation.length);
- for (String tempLoc : tempFileLocation)
- {
- File[] subFiles = new File(tempLoc).listFiles(fileFilter);
- if (null != subFiles && subFiles.length > 0)
- {
- files.addAll(Arrays.asList(subFiles));
- }
- }
-
- return files;
- }
-
- /**
- * This method will be used to create the heap which will be used to hold
- * the chunk of data
- */
- private void createRecordHolderQueue() {
- // creating record holder heap
- this.recordHolderHeapLocal = new PriorityQueue<SortTempChunkHolder>(fileCounter);
- }
-
- /**
- * This method will be used to get the sorted row
- *
- * @return sorted row
- */
- public Object[] next() {
- return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters);
- }
-
- /**
- * This method will be used to get the sorted record from file
- *
- * @return sorted record sorted record
- */
- private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
- Object[] row = null;
-
- // poll the top object from heap
- // heap maintains binary tree which is based on heap condition that will
- // be based on comparator we are passing the heap
- // when will call poll it will always delete root of the tree and then
- // it does trickel down operation complexity is log(n)
- SortTempChunkHolder poll = this.recordHolderHeapLocal.poll();
-
- // get the row from chunk
- row = poll.getRow();
-
- // check if there no entry present
- if (!poll.hasNext()) {
- // if chunk is empty then close the stream
- poll.close();
-
- // change the file counter
- --this.fileCounter;
-
- // reaturn row
- return row;
- }
-
- // read new row
- try {
- poll.readRow();
- } catch (Exception e) {
- throw new CarbonDataWriterException(e.getMessage(), e);
- }
-
- // add to heap
- this.recordHolderHeapLocal.add(poll);
-
- // return row
- return row;
- }
-
- /**
- * This method will be used to check whether any more element is present or
- * not
- *
- * @return more element is present
- */
- public boolean hasNext() {
- return this.fileCounter > 0;
- }
-
- public void clear() {
- if (null != recordHolderHeapLocal) {
- for (SortTempChunkHolder pageHolder : recordHolderHeapLocal) {
- pageHolder.close();
- }
- recordHolderHeapLocal = null;
- }
- }
-
- public boolean isStopProcess() {
- return isStopProcess;
- }
-
- public void setStopProcess(boolean stopProcess) {
- isStopProcess = stopProcess;
- }
-}