You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:19 UTC
[03/20] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
deleted file mode 100644
index 10b3ad5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> {
-
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(SortTempFileChunkHolder.class.getName());
-
- /**
- * temp file
- */
- private File tempFile;
-
- /**
- * read stream
- */
- private DataInputStream stream;
-
- /**
- * entry count
- */
- private int entryCount;
-
- /**
- * number record read
- */
- private int numberOfObjectRead;
-
- /**
- * return row
- */
- private Object[] returnRow;
-
- /**
- * number of measures
- */
- private int measureCount;
-
- /**
- * number of dimensionCount
- */
- private int dimensionCount;
-
- /**
- * number of complexDimensionCount
- */
- private int complexDimensionCount;
-
- /**
- * fileBufferSize for file reader stream size
- */
- private int fileBufferSize;
-
- private Object[][] currentBuffer;
-
- private Object[][] backupBuffer;
-
- private boolean isBackupFilled;
-
- private boolean prefetch;
-
- private int bufferSize;
-
- private int bufferRowCounter;
-
- private ExecutorService executorService;
-
- private Future<Void> submit;
-
- private int prefetchRecordsProceesed;
-
- /**
- * sortTempFileNoOFRecordsInCompression
- */
- private int sortTempFileNoOFRecordsInCompression;
-
- /**
- * isSortTempFileCompressionEnabled
- */
- private boolean isSortTempFileCompressionEnabled;
-
- /**
- * totalRecordFetch
- */
- private int totalRecordFetch;
-
- private int noDictionaryCount;
-
- private DataType[] aggType;
-
- /**
- * to store whether dimension is of dictionary type or not
- */
- private boolean[] isNoDictionaryDimensionColumn;
-
- /**
- * to store whether sort column is of dictionary type or not
- */
- private boolean[] isNoDictionarySortColumn;
-
- /**
- * Constructor to initialize
- *
- * @param tempFile
- * @param dimensionCount
- * @param complexDimensionCount
- * @param measureCount
- * @param fileBufferSize
- * @param noDictionaryCount
- * @param aggType
- * @param isNoDictionaryDimensionColumn
- */
- public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
- int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
- boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
- // set temp file
- this.tempFile = tempFile;
-
- // set measure and dimension count
- this.measureCount = measureCount;
- this.dimensionCount = dimensionCount;
- this.complexDimensionCount = complexDimensionCount;
-
- this.noDictionaryCount = noDictionaryCount;
- // set mdkey length
- this.fileBufferSize = fileBufferSize;
- this.executorService = Executors.newFixedThreadPool(1);
- this.aggType = aggType;
-
- this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
- this.isNoDictionarySortColumn = isNoDictionarySortColumn;
- }
-
- /**
- * This method will be used to initialize
- *
- * @throws CarbonSortKeyAndGroupByException problem while initializing
- */
- public void initialize() throws CarbonSortKeyAndGroupByException {
- prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
- CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
- bufferSize = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
- CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
- this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
- CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
- if (this.isSortTempFileCompressionEnabled) {
- LOGGER.info("Compression was used while writing the sortTempFile");
- }
-
- try {
- this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
- CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
- if (this.sortTempFileNoOFRecordsInCompression < 1) {
- LOGGER.error("Invalid value for: "
- + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
- + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
- + " be used");
-
- this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
- CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
- }
- } catch (NumberFormatException e) {
- LOGGER.error(
- "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
- + ", only Positive Integer value is allowed.Default value will be used");
- this.sortTempFileNoOFRecordsInCompression = Integer
- .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
- }
-
- initialise();
- }
-
- private void initialise() throws CarbonSortKeyAndGroupByException {
- try {
- if (isSortTempFileCompressionEnabled) {
- this.bufferSize = sortTempFileNoOFRecordsInCompression;
- }
- stream = new DataInputStream(
- new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
- this.entryCount = stream.readInt();
- if (prefetch) {
- new DataFetcher(false).call();
- totalRecordFetch += currentBuffer.length;
- if (totalRecordFetch < this.entryCount) {
- submit = executorService.submit(new DataFetcher(true));
- }
- } else {
- if (isSortTempFileCompressionEnabled) {
- new DataFetcher(false).call();
- }
- }
-
- } catch (FileNotFoundException e) {
- LOGGER.error(e);
- throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
- } catch (IOException e) {
- LOGGER.error(e);
- throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
- } catch (Exception e) {
- LOGGER.error(e);
- throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
- }
- }
-
- /**
- * This method will be used to read new row from file
- *
- * @throws CarbonSortKeyAndGroupByException problem while reading
- */
- public void readRow() throws CarbonSortKeyAndGroupByException {
- if (prefetch) {
- fillDataForPrefetch();
- } else if (isSortTempFileCompressionEnabled) {
- if (bufferRowCounter >= bufferSize) {
- try {
- new DataFetcher(false).call();
- bufferRowCounter = 0;
- } catch (Exception e) {
- LOGGER.error(e);
- throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
- }
-
- }
- prefetchRecordsProceesed++;
- returnRow = currentBuffer[bufferRowCounter++];
- } else {
- this.returnRow = getRowFromStream();
- }
- }
-
- private void fillDataForPrefetch() {
- if (bufferRowCounter >= bufferSize) {
- if (isBackupFilled) {
- bufferRowCounter = 0;
- currentBuffer = backupBuffer;
- totalRecordFetch += currentBuffer.length;
- isBackupFilled = false;
- if (totalRecordFetch < this.entryCount) {
- submit = executorService.submit(new DataFetcher(true));
- }
- } else {
- try {
- submit.get();
- } catch (Exception e) {
- LOGGER.error(e);
- }
- bufferRowCounter = 0;
- currentBuffer = backupBuffer;
- isBackupFilled = false;
- totalRecordFetch += currentBuffer.length;
- if (totalRecordFetch < this.entryCount) {
- submit = executorService.submit(new DataFetcher(true));
- }
- }
- }
- prefetchRecordsProceesed++;
- returnRow = currentBuffer[bufferRowCounter++];
- }
-
- /**
- * Reads row from file
- * @return Object[]
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
- // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
-
- Object[] holder = new Object[3];
- int index = 0;
- int nonDicIndex = 0;
- int[] dim = new int[this.dimensionCount - this.noDictionaryCount];
- byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][];
- Object[] measures = new Object[this.measureCount];
- try {
- // read dimension values
- for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
- if (isNoDictionaryDimensionColumn[i]) {
- short len = stream.readShort();
- byte[] array = new byte[len];
- stream.readFully(array);
- nonDicArray[nonDicIndex++] = array;
- } else {
- dim[index++] = stream.readInt();
- }
- }
-
- for (int i = 0; i < complexDimensionCount; i++) {
- short len = stream.readShort();
- byte[] array = new byte[len];
- stream.readFully(array);
- nonDicArray[nonDicIndex++] = array;
- }
-
- index = 0;
- // read measure values
- for (int i = 0; i < this.measureCount; i++) {
- if (stream.readByte() == 1) {
- switch (aggType[i]) {
- case SHORT:
- measures[index++] = stream.readShort();
- break;
- case INT:
- measures[index++] = stream.readInt();
- break;
- case LONG:
- measures[index++] = stream.readLong();
- break;
- case DOUBLE:
- measures[index++] = stream.readDouble();
- break;
- case DECIMAL:
- int len = stream.readInt();
- byte[] buff = new byte[len];
- stream.readFully(buff);
- measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
- break;
- default:
- throw new IllegalArgumentException("unsupported data type:" + aggType[i]);
- }
- } else {
- measures[index++] = null;
- }
- }
-
- NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
-
- // increment number if record read
- this.numberOfObjectRead++;
- } catch (IOException e) {
- LOGGER.error("Problme while reading the madkey fom sort temp file");
- throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e);
- }
-
- //return out row
- return holder;
- }
-
- /**
- * below method will be used to get the row
- *
- * @return row
- */
- public Object[] getRow() {
- return this.returnRow;
- }
-
- /**
- * below method will be used to check whether any more records are present
- * in file or not
- *
- * @return more row present in file
- */
- public boolean hasNext() {
- if (prefetch || isSortTempFileCompressionEnabled) {
- return this.prefetchRecordsProceesed < this.entryCount;
- }
- return this.numberOfObjectRead < this.entryCount;
- }
-
- /**
- * Below method will be used to close streams
- */
- public void closeStream() {
- CarbonUtil.closeStreams(stream);
- executorService.shutdown();
- this.backupBuffer = null;
- this.currentBuffer = null;
- }
-
- /**
- * This method will number of entries
- *
- * @return entryCount
- */
- public int getEntryCount() {
- return entryCount;
- }
-
- @Override public int compareTo(SortTempFileChunkHolder other) {
- int diff = 0;
- int index = 0;
- int noDictionaryIndex = 0;
- int[] leftMdkArray = (int[]) returnRow[0];
- int[] rightMdkArray = (int[]) other.returnRow[0];
- byte[][] leftNonDictArray = (byte[][]) returnRow[1];
- byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
- for (boolean isNoDictionary : isNoDictionarySortColumn) {
- if (isNoDictionary) {
- diff = UnsafeComparer.INSTANCE
- .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);
- if (diff != 0) {
- return diff;
- }
- noDictionaryIndex++;
- } else {
- diff = leftMdkArray[index] - rightMdkArray[index];
- if (diff != 0) {
- return diff;
- }
- index++;
- }
-
- }
- return diff;
- }
-
- @Override public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof SortTempFileChunkHolder)) {
- return false;
- }
- SortTempFileChunkHolder o = (SortTempFileChunkHolder) obj;
-
- return this == o;
- }
-
- @Override public int hashCode() {
- int hash = 0;
- hash += 31 * measureCount;
- hash += 31 * dimensionCount;
- hash += 31 * complexDimensionCount;
- hash += 31 * noDictionaryCount;
- hash += tempFile.hashCode();
- return hash;
- }
-
- private final class DataFetcher implements Callable<Void> {
- private boolean isBackUpFilling;
-
- private int numberOfRecords;
-
- private DataFetcher(boolean backUp) {
- isBackUpFilling = backUp;
- calculateNumberOfRecordsToBeFetched();
- }
-
- private void calculateNumberOfRecordsToBeFetched() {
- int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
- numberOfRecords =
- bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
- }
-
- @Override public Void call() throws Exception {
- try {
- if (isBackUpFilling) {
- backupBuffer = prefetchRecordsFromFile(numberOfRecords);
- isBackupFilled = true;
- } else {
- currentBuffer = prefetchRecordsFromFile(numberOfRecords);
- }
- } catch (Exception e) {
- LOGGER.error(e);
- }
- return null;
- }
-
- }
-
- /**
- * This method will read the records from sort temp file and keep it in a buffer
- *
- * @param numberOfRecords
- * @return
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[][] prefetchRecordsFromFile(int numberOfRecords)
- throws CarbonSortKeyAndGroupByException {
- Object[][] records = new Object[numberOfRecords][];
- for (int i = 0; i < numberOfRecords; i++) {
- records[i] = getRowFromStream();
- }
- return records;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkWriter.java
deleted file mode 100644
index f0bac85..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkWriter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.File;
-
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class SortTempFileChunkWriter implements TempSortFileWriter {
- /**
- * writer
- */
- private TempSortFileWriter writer;
-
- /**
- * recordPerLeaf
- */
- private int recordPerLeaf;
-
- /**
- * CarbonCompressedSortTempFileChunkWriter
- *
- * @param writer
- */
- public SortTempFileChunkWriter(TempSortFileWriter writer, int recordPerLeaf) {
- this.writer = writer;
- this.recordPerLeaf = recordPerLeaf;
- }
-
- /**
- * initialize
- */
- public void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException {
- this.writer.initiaize(file, entryCount);
- }
-
- /**
- * finish
- */
- public void finish() {
- this.writer.finish();
- }
-
- /**
- * Below method will be used to write the sort temp file chunk by chunk
- */
- public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
- int recordCount = 0;
- Object[][] tempRecords;
- while (recordCount < records.length) {
- if (records.length - recordCount < recordPerLeaf) {
- recordPerLeaf = records.length - recordCount;
- }
- tempRecords = new Object[recordPerLeaf][];
- System.arraycopy(records, recordCount, tempRecords, 0, recordPerLeaf);
- recordCount += recordPerLeaf;
- this.writer.writeSortTempFile(tempRecords);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileReader.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileReader.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileReader.java
deleted file mode 100644
index 2bf657e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-public interface TempSortFileReader {
- /**
- * below method will be used to close the file holder
- */
- void finish();
-
- /**
- * Below method will be used to get the row
- */
- Object[][] getRow();
-
- /**
- * Below method will be used to get the total row count in temp file
- *
- * @return
- */
- int getEntryCount();
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriter.java
deleted file mode 100644
index 6679c8e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.File;
-
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public interface TempSortFileWriter {
- /**
- * Method will be used to initialize
- *
- * @param file
- * @param entryCount
- * @throws CarbonSortKeyAndGroupByException
- */
- void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException;
-
- /**
- * Method will be used to finish
- */
- void finish();
-
- /**
- * Below method will be used to write the sort temp file
- *
- * @param records
- * @throws CarbonSortKeyAndGroupByException
- */
- void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException;
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriterFactory.java
deleted file mode 100644
index c0e8c6e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriterFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-public final class TempSortFileWriterFactory {
- private static final TempSortFileWriterFactory WRITERFACTORY = new TempSortFileWriterFactory();
-
- private TempSortFileWriterFactory() {
-
- }
-
- public static TempSortFileWriterFactory getInstance() {
- return WRITERFACTORY;
- }
-
- public TempSortFileWriter getTempSortFileWriter(boolean isCompressionEnabled, int dimensionCount,
- int complexDimensionCount, int measureCount, int noDictionaryCount, int writeBufferSize) {
- if (isCompressionEnabled) {
- return new CompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
- noDictionaryCount, writeBufferSize);
- } else {
- return new UnCompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
- noDictionaryCount, writeBufferSize);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
deleted file mode 100644
index 51b3964..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class UnCompressedTempSortFileWriter extends AbstractTempSortFileWriter {
-
- /**
- * UnCompressedTempSortFileWriter
- *
- * @param writeBufferSize
- * @param dimensionCount
- * @param measureCount
- */
- public UnCompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
- int measureCount, int noDictionaryCount, int writeBufferSize) {
- super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
- }
-
- public static void writeDataOutputStream(Object[][] records, DataOutputStream dataOutputStream,
- int measureCount, int dimensionCount, int noDictionaryCount, int complexDimensionCount)
- throws IOException {
- Object[] row;
- for (int recordIndex = 0; recordIndex < records.length; recordIndex++) {
- row = records[recordIndex];
- int fieldIndex = 0;
-
- for (int counter = 0; counter < dimensionCount; counter++) {
- dataOutputStream.writeInt((Integer) NonDictionaryUtil.getDimension(fieldIndex++, row));
- }
-
- //write byte[] of high card dims
- if (noDictionaryCount > 0) {
- dataOutputStream.write(NonDictionaryUtil.getByteArrayForNoDictionaryCols(row));
- }
- fieldIndex = 0;
- for (int counter = 0; counter < complexDimensionCount; counter++) {
- int complexByteArrayLength = ((byte[]) row[fieldIndex]).length;
- dataOutputStream.writeInt(complexByteArrayLength);
- dataOutputStream.write(((byte[]) row[fieldIndex++]));
- }
-
- for (int counter = 0; counter < measureCount; counter++) {
- if (null != row[fieldIndex]) {
- dataOutputStream.write((byte) 1);
- dataOutputStream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
- } else {
- dataOutputStream.write((byte) 0);
- }
-
- fieldIndex++;
- }
-
- }
- }
-
- /**
- * Below method will be used to write the sort temp file
- *
- * @param records
- */
- public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
- ByteArrayOutputStream blockDataArray = null;
- DataOutputStream dataOutputStream = null;
- int totalSize = 0;
- int recordSize = 0;
- try {
- recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
- * CarbonCommonConstants.INT_SIZE_IN_BYTE);
- totalSize = records.length * recordSize;
-
- blockDataArray = new ByteArrayOutputStream(totalSize);
- dataOutputStream = new DataOutputStream(blockDataArray);
-
- writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
- noDictionaryCount, complexDimensionCount);
- stream.writeInt(records.length);
- byte[] byteArray = blockDataArray.toByteArray();
- stream.writeInt(byteArray.length);
- stream.write(byteArray);
-
- } catch (IOException e) {
- throw new CarbonSortKeyAndGroupByException(e);
- } finally {
- CarbonUtil.closeStreams(blockDataArray);
- CarbonUtil.closeStreams(dataOutputStream);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java
deleted file mode 100644
index 39d1234..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.spliter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.scan.executor.QueryExecutor;
-import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-public abstract class AbstractCarbonQueryExecutor {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName());
- protected CarbonTable carbonTable;
- protected QueryModel queryModel;
- protected QueryExecutor queryExecutor;
- protected Map<String, TaskBlockInfo> segmentMapping;
-
- /**
- * get executor and execute the query model.
- *
- * @param blockList
- * @return
- */
- protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
- throws QueryExecutionException, IOException {
- queryModel.setTableBlockInfos(blockList);
- this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
- return queryExecutor.execute(queryModel);
- }
-
- /**
- * Preparing of the query model.
- *
- * @param blockList
- * @return
- */
- protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
- QueryModel model = new QueryModel();
- model.setTableBlockInfos(blockList);
- model.setForcedDetailRawQuery(true);
- model.setFilterExpressionResolverTree(null);
-
- List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
- for (CarbonDimension dim : dimensions) {
- // check if dimension is deleted
- QueryDimension queryDimension = new QueryDimension(dim.getColName());
- queryDimension.setDimension(dim);
- dims.add(queryDimension);
- }
- model.setQueryDimension(dims);
-
- List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
- for (CarbonMeasure carbonMeasure : measures) {
- // check if measure is deleted
- QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
- queryMeasure.setMeasure(carbonMeasure);
- msrs.add(queryMeasure);
- }
- model.setQueryMeasures(msrs);
- model.setQueryId(System.nanoTime() + "");
- model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
- model.setTable(carbonTable);
- return model;
- }
-
- /**
- * Below method will be used
- * for cleanup
- */
- public void finish() {
- try {
- queryExecutor.finish();
- } catch (QueryExecutionException e) {
- LOGGER.error(e, "Problem while finish: ");
- }
- clearDictionaryFromQueryModel();
- }
-
- /**
- * This method will clear the dictionary access count after its usage is complete so
- * that column can be deleted form LRU cache whenever memory reaches threshold
- */
- private void clearDictionaryFromQueryModel() {
- if (null != queryModel) {
- Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
- if (null != columnToDictionaryMapping) {
- for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
- CarbonUtil.clearDictionaryCache(entry.getValue());
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java
deleted file mode 100644
index 7b724ee..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.spliter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
-
-/**
- * Used to read carbon blocks when add/split partition
- */
-public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonSplitExecutor.class.getName());
-
- public CarbonSplitExecutor(Map<String, TaskBlockInfo> segmentMapping, CarbonTable carbonTable) {
- this.segmentMapping = segmentMapping;
- this.carbonTable = carbonTable;
- }
-
- public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId)
- throws QueryExecutionException, IOException {
- List<TableBlockInfo> list = null;
- queryModel = prepareQueryModel(list);
- List<PartitionSpliterRawResultIterator> resultList
- = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId);
- Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
- for (String task : taskBlockListMapping) {
- list = taskBlockInfo.getTableBlockInfoList(task);
- LOGGER.info("for task -" + task + "-block size is -" + list.size());
- queryModel.setTableBlockInfos(list);
- resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list)));
- }
- return resultList;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
deleted file mode 100644
index 9316c9f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.spliter;
-
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.spliter.exception.AlterPartitionSliceException;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class RowResultProcessor {
-
- private CarbonFactHandler dataHandler;
- private SegmentProperties segmentProperties;
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(RowResultProcessor.class.getName());
-
-
- public RowResultProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
- SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
- CarbonDataProcessorUtil.createLocations(tempStoreLocation);
- this.segmentProperties = segProp;
- String tableName = carbonTable.getFactTableName();
- CarbonFactDataHandlerModel carbonFactDataHandlerModel =
- CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
- segProp, tableName, tempStoreLocation);
- CarbonDataFileAttributes carbonDataFileAttributes =
- new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
- loadModel.getFactTimeStamp());
- carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
- carbonFactDataHandlerModel.setBucketId(bucketId);
- //Note: set compaction flow just to convert decimal type
- carbonFactDataHandlerModel.setCompactionFlow(true);
- dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
- }
-
- public boolean execute(List<Object[]> resultList) {
- boolean processStatus;
- boolean isDataPresent = false;
-
- try {
- if (!isDataPresent) {
- dataHandler.initialise();
- isDataPresent = true;
- }
- for (Object[] row: resultList) {
- addRow(row);
- }
- if (isDataPresent)
- {
- this.dataHandler.finish();
- }
- processStatus = true;
- } catch (AlterPartitionSliceException e) {
- LOGGER.error(e, e.getMessage());
- LOGGER.error("Exception in executing RowResultProcessor" + e.getMessage());
- processStatus = false;
- } finally {
- try {
- if (isDataPresent) {
- this.dataHandler.closeHandler();
- }
- } catch (Exception e) {
- LOGGER.error("Exception while closing the handler in RowResultProcessor" + e.getMessage());
- processStatus = false;
- }
- }
- return processStatus;
- }
-
- private void addRow(Object[] carbonTuple) throws AlterPartitionSliceException {
- CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
- try {
- this.dataHandler.addDataToStore(row);
- } catch (CarbonDataWriterException e) {
- throw new AlterPartitionSliceException("Exception in adding rows in RowResultProcessor", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
deleted file mode 100644
index 0e53a1f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.spliter.exception;
-
-import java.util.Locale;
-
-public class AlterPartitionSliceException extends Exception {
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public AlterPartitionSliceException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public AlterPartitionSliceException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * This method is used to get the localized message.
- *
- * @param locale - A Locale object represents a specific geographical,
- * political, or cultural region.
- * @return - Localized error message.
- */
- public String getLocalizedMessage(Locale locale) {
- return "";
- }
-
- /**
- * getLocalizedMessage
- */
- @Override public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java b/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java
new file mode 100644
index 0000000..c7d5dd8
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.splits;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.processing.partition.Partition;
+
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * It represents one region server as one split.
+ */
+public class TableSplit implements Serializable, Writable {
+ private static final long serialVersionUID = -8058151330863145575L;
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(TableSplit.class.getName());
+ private List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+ private Partition partition;
+
+ /**
+ * @return the locations
+ */
+ public List<String> getLocations() {
+ return locations;
+ }
+
+ /**
+ * @param locations the locations to set
+ */
+ public void setLocations(List<String> locations) {
+ this.locations = locations;
+ }
+
+ /**
+ * @return Returns the partitions.
+ */
+ public Partition getPartition() {
+ return partition;
+ }
+
+ /**
+ * @param partition The partitions to set.
+ */
+ public void setPartition(Partition partition) {
+ this.partition = partition;
+ }
+
+ @Override public void readFields(DataInput in) throws IOException {
+
+ int sizeLoc = in.readInt();
+ for (int i = 0; i < sizeLoc; i++) {
+ byte[] b = new byte[in.readInt()];
+ in.readFully(b);
+ locations.add(new String(b, Charset.defaultCharset()));
+ }
+
+ byte[] buf = new byte[in.readInt()];
+ in.readFully(buf);
+ ByteArrayInputStream bis = new ByteArrayInputStream(buf);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ try {
+ partition = (Partition) ois.readObject();
+ } catch (ClassNotFoundException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ ois.close();
+ }
+
+ @Override public void write(DataOutput out) throws IOException {
+
+ int sizeLoc = locations.size();
+ out.writeInt(sizeLoc);
+ for (int i = 0; i < sizeLoc; i++) {
+ byte[] bytes = locations.get(i).getBytes(Charset.defaultCharset());
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ ObjectOutputStream obs = new ObjectOutputStream(bos);
+ obs.writeObject(partition);
+ obs.close();
+ byte[] byteArray = bos.toByteArray();
+ out.writeInt(byteArray.length);
+ out.write(byteArray);
+ }
+
+ public String toString() {
+ return partition.getUniqueID() + ' ' + locations;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
index 0b606b0..b69815e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
@@ -17,20 +17,11 @@
package org.apache.carbondata.processing.store;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
/**
* This class contains attributes of file which are required to
* construct file name like taskId, factTimeStamp
*/
public class CarbonDataFileAttributes {
-
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonDataFileAttributes.class.getName());
/**
* task Id which is unique for each spark task
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
index 0fe922d..7a5cc11 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -25,7 +25,7 @@ import org.apache.carbondata.processing.store.writer.v3.CarbonFactDataWriterImpl
/**
* Factory class to get the writer instance
*/
-public class CarbonDataWriterFactory {
+class CarbonDataWriterFactory {
/**
* static instance
@@ -56,7 +56,7 @@ public class CarbonDataWriterFactory {
* @param carbonDataWriterVo writer vo object
* @return writer instance
*/
- public CarbonFactDataWriter<?> getFactDataWriter(final ColumnarFormatVersion version,
+ public CarbonFactDataWriter getFactDataWriter(final ColumnarFormatVersion version,
final CarbonDataWriterVo carbonDataWriterVo) {
switch (version) {
case V1:
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index c4a5fc5..2c275bf 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
import org.apache.carbondata.processing.store.file.FileManager;
import org.apache.carbondata.processing.store.file.IFileManagerComposite;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
@@ -88,10 +88,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* once this size of input is reached
*/
private int pageSize;
- /**
- * keyBlockHolder
- */
- private CarbonKeyBlockHolder[] keyBlockHolder;
// This variable is true if it is dictionary dimension and its cardinality is lower than
// property of CarbonCommonConstants.HIGH_CARDINALITY_VALUE
@@ -455,7 +451,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.dataWriter.closeWriter();
}
this.dataWriter = null;
- this.keyBlockHolder = null;
}
/**
@@ -488,15 +483,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
//than below splitter will return column as {0,1,2}{3}{4}{5}
ColumnarSplitter columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore);
- this.keyBlockHolder =
- new CarbonKeyBlockHolder[columnarSplitter.getBlockKeySize().length];
- } else {
- this.keyBlockHolder = new CarbonKeyBlockHolder[0];
- }
-
- for (int i = 0; i < keyBlockHolder.length; i++) {
- this.keyBlockHolder[i] = new CarbonKeyBlockHolder(pageSize);
- this.keyBlockHolder[i].resetCounter();
}
// agg type
@@ -567,7 +553,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*
* @return data writer instance
*/
- private CarbonFactDataWriter<?> getFactDataWriter() {
+ private CarbonFactDataWriter getFactDataWriter() {
return CarbonDataWriterFactory.getInstance()
.getFactDataWriter(version, getDataWriterVo());
}
@@ -680,10 +666,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
return tablePage;
}
- /**
- * @param encodedTablePage
- * @param index
- */
public synchronized void put(TablePage tablePage, int index) {
tablePages[index] = tablePage;
// notify the consumer thread when index at which object is to be inserted
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 544a26a..2c346b2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -40,10 +40,10 @@ import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
// This class contains all the data required for processing and writing the carbon data
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonKeyBlockHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonKeyBlockHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonKeyBlockHolder.java
deleted file mode 100644
index 898917b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonKeyBlockHolder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-public class CarbonKeyBlockHolder {
- private byte[][] keyBlock;
-
- private int counter;
-
- public CarbonKeyBlockHolder(int size) {
- keyBlock = new byte[size][];
- }
-
- public void addRowToBlock(int index, byte[] keyBlock) {
- this.keyBlock[index] = keyBlock;
- counter++;
- }
-
- public byte[][] getKeyBlock() {
- if (counter < keyBlock.length) {
- byte[][] temp = new byte[counter][];
- System.arraycopy(keyBlock, 0, temp, 0, counter);
- return temp;
- }
- return keyBlock;
- }
-
- public void resetCounter() {
- counter = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
deleted file mode 100644
index 48227d1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortTempFileChunkHolder;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(SingleThreadFinalSortFilesMerger.class.getName());
-
- /**
- * lockObject
- */
- private static final Object LOCKOBJECT = new Object();
-
- /**
- * fileCounter
- */
- private int fileCounter;
-
- /**
- * fileBufferSize
- */
- private int fileBufferSize;
-
- /**
- * recordHolderHeap
- */
- private AbstractQueue<SortTempFileChunkHolder> recordHolderHeapLocal;
-
- /**
- * tableName
- */
- private String tableName;
-
- /**
- * measureCount
- */
- private int measureCount;
-
- /**
- * dimensionCount
- */
- private int dimensionCount;
-
- /**
- * measure count
- */
- private int noDictionaryCount;
-
- /**
- * complexDimensionCount
- */
- private int complexDimensionCount;
-
- /**
- * tempFileLocation
- */
- private String[] tempFileLocation;
-
- private DataType[] measureDataType;
-
- /**
- * below code is to check whether dimension
- * is of no dictionary type or not
- */
- private boolean[] isNoDictionaryColumn;
-
- private boolean[] isNoDictionarySortColumn;
-
- public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String tableName,
- int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
- DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
- this.tempFileLocation = tempFileLocation;
- this.tableName = tableName;
- this.dimensionCount = dimensionCount;
- this.complexDimensionCount = complexDimensionCount;
- this.measureCount = measureCount;
- this.measureDataType = type;
- this.noDictionaryCount = noDictionaryCount;
- this.isNoDictionaryColumn = isNoDictionaryColumn;
- this.isNoDictionarySortColumn = isNoDictionarySortColumn;
- }
-
- /**
- * This method will be used to merger the merged files
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- public void startFinalMerge() throws CarbonDataWriterException {
- List<File> filesToMerge = getFilesToMergeSort();
- if (filesToMerge.size() == 0)
- {
- LOGGER.info("No file to merge in final merge stage");
- return;
- }
-
- startSorting(filesToMerge);
- }
-
- private List<File> getFilesToMergeSort() {
- FileFilter fileFilter = new FileFilter() {
- public boolean accept(File pathname) {
- return pathname.getName().startsWith(tableName);
- }
- };
-
- // get all the merged files
- List<File> files = new ArrayList<File>(tempFileLocation.length);
- for (String tempLoc : tempFileLocation)
- {
- File[] subFiles = new File(tempLoc).listFiles(fileFilter);
- if (null != subFiles && subFiles.length > 0)
- {
- files.addAll(Arrays.asList(subFiles));
- }
- }
-
- return files;
- }
-
- /**
- * Below method will be used to start storing process This method will get
- * all the temp files present in sort temp folder then it will create the
- * record holder heap and then it will read first record from each file and
- * initialize the heap
- *
- * @throws CarbonSortKeyAndGroupByException
- */
- private void startSorting(List<File> files) throws CarbonDataWriterException {
- this.fileCounter = files.size();
- if (fileCounter == 0) {
- LOGGER.info("No files to merge sort");
- return;
- }
- this.fileBufferSize = CarbonDataProcessorUtil
- .getFileBufferSize(this.fileCounter, CarbonProperties.getInstance(),
- CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
- LOGGER.info("Number of temp file: " + this.fileCounter);
-
- LOGGER.info("File Buffer Size: " + this.fileBufferSize);
-
- // create record holder heap
- createRecordHolderQueue();
-
- // iterate over file list and create chunk holder and add to heap
- LOGGER.info("Started adding first record from each file");
- int maxThreadForSorting = 0;
- try {
- maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
- CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE));
- } catch (NumberFormatException e) {
- maxThreadForSorting =
- Integer.parseInt(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE);
- }
- ExecutorService service = Executors.newFixedThreadPool(maxThreadForSorting);
-
- for (final File tempFile : files) {
-
- Runnable runnable = new Runnable() {
- @Override public void run() {
-
- // create chunk holder
- SortTempFileChunkHolder sortTempFileChunkHolder =
- new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
- measureCount, fileBufferSize, noDictionaryCount, measureDataType,
- isNoDictionaryColumn, isNoDictionarySortColumn);
- try {
- // initialize
- sortTempFileChunkHolder.initialize();
- sortTempFileChunkHolder.readRow();
- } catch (CarbonSortKeyAndGroupByException ex) {
- LOGGER.error(ex);
- }
-
- synchronized (LOCKOBJECT) {
- recordHolderHeapLocal.add(sortTempFileChunkHolder);
- }
- }
- };
- service.execute(runnable);
- }
- service.shutdown();
-
- try {
- service.awaitTermination(2, TimeUnit.HOURS);
- } catch (Exception e) {
- throw new CarbonDataWriterException(e.getMessage(), e);
- }
-
- LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
- }
-
- /**
- * This method will be used to create the heap which will be used to hold
- * the chunk of data
- */
- private void createRecordHolderQueue() {
- // creating record holder heap
- this.recordHolderHeapLocal = new PriorityQueue<SortTempFileChunkHolder>(fileCounter);
- }
-
- /**
- * This method will be used to get the sorted row
- *
- * @return sorted row
- * @throws CarbonSortKeyAndGroupByException
- */
- public Object[] next() {
- return getSortedRecordFromFile();
- }
-
- /**
- * This method will be used to get the sorted record from file
- *
- * @return sorted record sorted record
- * @throws CarbonSortKeyAndGroupByException
- */
- private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
- Object[] row = null;
-
- // poll the top object from heap
- // heap maintains binary tree which is based on heap condition that will
- // be based on comparator we are passing the heap
- // when will call poll it will always delete root of the tree and then
- // it does trickel down operation complexity is log(n)
- SortTempFileChunkHolder poll = this.recordHolderHeapLocal.poll();
-
- // get the row from chunk
- row = poll.getRow();
-
- // check if there no entry present
- if (!poll.hasNext()) {
- // if chunk is empty then close the stream
- poll.closeStream();
-
- // change the file counter
- --this.fileCounter;
-
- // reaturn row
- return row;
- }
-
- // read new row
- try {
- poll.readRow();
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataWriterException(e.getMessage(), e);
- }
-
- // add to heap
- this.recordHolderHeapLocal.add(poll);
-
- // return row
- return row;
- }
-
- /**
- * This method will be used to check whether any more element is present or
- * not
- *
- * @return more element is present
- */
- public boolean hasNext() {
- return this.fileCounter > 0;
- }
-
- public void clear() {
- if (null != recordHolderHeapLocal) {
- recordHolderHeapLocal = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupDataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupDataHolder.java
deleted file mode 100644
index a2e22c2..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupDataHolder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.colgroup;
-
-
-/**
- * This will hold column group data.
- */
-public class ColGroupDataHolder implements DataHolder {
-
- private int noOfRecords;
-
- /**
- * colGrpData[row no][data]
- */
- private byte[][] colGrpData;
-
- /**
- * This will have min max value of each chunk
- */
- private ColGroupMinMax colGrpMinMax;
-
- /**
- * each row size of this column group block
- */
- private int keyBlockSize;
-
- /**
- * @param keyBlockSize
- * @param noOfRecords
- * @param colGrpMinMax
- */
- public ColGroupDataHolder(int keyBlockSize,
- int noOfRecords,ColGroupMinMax colGrpMinMax) {
- this.noOfRecords = noOfRecords;
- this.keyBlockSize = keyBlockSize;
- this.colGrpMinMax = colGrpMinMax;
- colGrpData = new byte[noOfRecords][];
- }
-
- @Override public void addData(byte[] rowsData, int rowIndex) {
- colGrpData[rowIndex] = rowsData;
- colGrpMinMax.add(rowsData);
- }
-
- /**
- * this will return min of each chunk
- *
- * @return
- */
- public byte[] getMin() {
- return colGrpMinMax.getMin();
- }
-
- /**
- * this will return max of each chunk
- *
- * @return
- */
- public byte[] getMax() {
- return colGrpMinMax.getMax();
- }
-
- /**
- * Return size of this column group block
- *
- * @return
- */
- public int getKeyBlockSize() {
- return keyBlockSize;
- }
-
- @Override public byte[][] getData() {
- return colGrpData;
- }
-
- /**
- * return total size required by this block
- *
- * @return
- */
- public int getTotalSize() {
- return noOfRecords * keyBlockSize;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMax.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMax.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMax.java
deleted file mode 100644
index b3d11f2..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMax.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.colgroup;
-
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.util.ByteUtil;
-
-/**
- * it gives min max of each column of column group
- */
-public class ColGroupMinMax {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(ColGroupMinMax.class.getName());
-
- /**
- * key generator
- */
- private KeyGenerator keyGenerator;
-
- /**
- * no of column in column group
- */
- private int noOfCol;
-
- /**
- * min value of each column
- */
- private byte[][] min;
-
- /**
- * max value of each column
- */
- private byte[][] max;
-
- /**
- * mask byte range
- */
- private int[][] maskByteRange;
-
- /**
- * max keys
- */
- private byte[][] maxKeys;
-
- public ColGroupMinMax(SegmentProperties segmentProperties, int colGroupId) {
- this.keyGenerator = segmentProperties.getColumnGroupAndItsKeygenartor().get(colGroupId);
- this.noOfCol = segmentProperties.getNoOfColumnsInColumnGroup(colGroupId);
- min = new byte[noOfCol][];
- max = new byte[noOfCol][];
- initialise();
- }
-
- /**
- * @param mdkey
- */
- public void add(byte[] mdkey) {
- for (int i = 0; i < noOfCol; i++) {
- byte[] col = getMaskedKey(mdkey, maskByteRange[i], maxKeys[i]);
- setMin(col, i);
- setMax(col, i);
- }
- }
-
- /**
- * Below method will be used to get the masked key
- *
- * @param data
- * @return maskedKey
- */
- private byte[] getMaskedKey(byte[] data, int[] maskByteRange, byte[] maxKey) {
- int keySize = maskByteRange.length;
- byte[] maskedKey = new byte[keySize];
- int counter = 0;
- int byteRange = 0;
- for (int i = 0; i < keySize; i++) {
- byteRange = maskByteRange[i];
- maskedKey[counter++] = (byte) (data[byteRange] & maxKey[byteRange]);
- }
- return maskedKey;
- }
-
- /**
- * intitialising data required for min max calculation
- */
- private void initialise() {
- try {
- maskByteRange = new int[noOfCol][];
- maxKeys = new byte[noOfCol][];
- for (int i = 0; i < noOfCol; i++) {
- maskByteRange[i] = getMaskByteRange(i);
- // generating maxkey
- long[] maxKey = new long[noOfCol];
- maxKey[i] = Long.MAX_VALUE;
- maxKeys[i] = keyGenerator.generateKey(maxKey);
- }
- } catch (KeyGenException e) {
- LOGGER.error(e, "Key generation failed while evaulating column group min max");
- }
-
- }
-
- /**
- * get range for given column in generated md key
- *
- * @param col : column
- * @return maskByteRange
- */
- private int[] getMaskByteRange(int col) {
- Set<Integer> integers = new HashSet<>();
- int[] range = keyGenerator.getKeyByteOffsets(col);
- for (int j = range[0]; j <= range[1]; j++) {
- integers.add(j);
- }
- int[] byteIndexs = new int[integers.size()];
- int j = 0;
- for (Iterator<Integer> iterator = integers.iterator(); iterator.hasNext(); ) {
- Integer integer = (Integer) iterator.next();
- byteIndexs[j++] = integer.intValue();
- }
- return byteIndexs;
- }
-
- /**
- * set min value of given column
- *
- * @param colData
- * @param column
- */
- private void setMin(byte[] colData, int column) {
-
- if (null == min[column]) {
- min[column] = colData;
- } else {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(colData, min[column]) < 0) {
- min[column] = colData;
- }
- }
- }
-
- /**
- * set max value of given column
- *
- * @param colData
- * @param column
- */
- private void setMax(byte[] colData, int column) {
- if (null == max[column]) {
- max[column] = colData;
- } else {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(colData, max[column]) > 0) {
- max[column] = colData;
- }
-
- }
- }
-
- /**
- * Get min value of block
- *
- * @return min value of block
- */
- public byte[] getMin() {
- int size = 0;
- for (int i = 0; i < noOfCol; i++) {
- size += min[i].length;
- }
- ByteBuffer bb = ByteBuffer.allocate(size);
- for (int i = 0; i < noOfCol; i++) {
- bb.put(min[i]);
- }
- return bb.array();
- }
-
- /**
- * get max value of block
- *
- * @return max value of block
- */
- public byte[] getMax() {
- int size = 0;
- for (int i = 0; i < noOfCol; i++) {
- size += max[i].length;
- }
- ByteBuffer bb = ByteBuffer.allocate(size);
- for (int i = 0; i < noOfCol; i++) {
- bb.put(max[i]);
- }
- return bb.array();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColumnDataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColumnDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColumnDataHolder.java
deleted file mode 100644
index 8caf339..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColumnDataHolder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.colgroup;
-
-/**
- * Store mdkey data for each column block
- */
-public class ColumnDataHolder implements DataHolder {
-
- private byte[][] data;
-
- public ColumnDataHolder(int noOfRow) {
- data = new byte[noOfRow][];
- }
-
- @Override public void addData(byte[] rowRecord, int rowIndex) {
- data[rowIndex] = rowRecord;
- }
-
- @Override public byte[][] getData() {
- return data;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/DataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/DataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/DataHolder.java
deleted file mode 100644
index 3b1ee81..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/DataHolder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.colgroup;
-
-/**
- * Hold complete data for a leaf node
- */
-public interface DataHolder {
-
- /**
- * add row to holder
- *
- * @param rowRecord: row data
- * @param rowIndex : row number
- */
- public void addData(byte[] rowRecord, int rowIndex);
-
- /**
- * return the data when required
- *
- * @return all data
- */
- public byte[][] getData();
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index ec42596..acb3b3b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -64,7 +64,7 @@ import org.apache.carbondata.processing.store.file.FileData;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.io.IOUtils;
-public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<T> {
+public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index e195d10..3a2fa1c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.processing.store.TablePage;
-public interface CarbonFactDataWriter<T> {
+public interface CarbonFactDataWriter {
/**
* write a encoded table page