You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:22 UTC
[37/52] [partial] incubator-carbondata git commit: Renamed packages
to org.apache.carbondata and fixed errors
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
new file mode 100644
index 0000000..05c76ef
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
+
+import org.apache.thrift.TBase;
+
+/**
+ * This class perform the functionality of reading the dictionary metadata file
+ */
+public class CarbonDictionaryMetadataReaderImpl implements CarbonDictionaryMetadataReader {
+
+ /**
+ * carbon table identifier
+ */
+ protected CarbonTableIdentifier carbonTableIdentifier;
+
+ /**
+ * HDFS store path
+ */
+ protected String hdfsStorePath;
+
+ /**
+ * column identifier
+ */
+ protected ColumnIdentifier columnIdentifier;
+
+ /**
+ * dictionary metadata file path
+ */
+ protected String columnDictionaryMetadataFilePath;
+
+ /**
+ * dictionary metadata thrift file reader
+ */
+ private ThriftReader dictionaryMetadataFileReader;
+
+ /**
+ * Constructor
+ *
+ * @param hdfsStorePath HDFS store path
+ * @param carbonTableIdentifier table identifier which will give table name and database name
+ * @param columnIdentifier column unique identifier
+ */
+ public CarbonDictionaryMetadataReaderImpl(String hdfsStorePath,
+ CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier) {
+ this.hdfsStorePath = hdfsStorePath;
+ this.carbonTableIdentifier = carbonTableIdentifier;
+ this.columnIdentifier = columnIdentifier;
+ initFileLocation();
+ }
+
+ /**
+ * This method will be used to read complete metadata file.
+ * Applicable scenarios:
+ * 1. Query execution. Whenever a query is executed then to read the dictionary file
+ * and define the query scope first dictionary metadata has to be read first.
+ * 2. If dictionary file is read using start and end offset then using this meta list
+ * we can count the total number of dictionary chunks present between the 2 offsets
+ *
+ * @return list of all dictionary meta chunks which contains information for each segment
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public List<CarbonDictionaryColumnMetaChunk> read() throws IOException {
+ List<CarbonDictionaryColumnMetaChunk> dictionaryMetaChunks =
+ new ArrayList<CarbonDictionaryColumnMetaChunk>(
+ CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ CarbonDictionaryColumnMetaChunk columnMetaChunk = null;
+ ColumnDictionaryChunkMeta dictionaryChunkMeta = null;
+ // open dictionary meta thrift reader
+ openThriftReader();
+ // read till dictionary chunk count
+ while (dictionaryMetadataFileReader.hasNext()) {
+ // get the thrift object for dictionary chunk
+ dictionaryChunkMeta = (ColumnDictionaryChunkMeta) dictionaryMetadataFileReader.read();
+ // create a new instance of chunk meta wrapper using thrift object
+ columnMetaChunk = getNewInstanceOfCarbonDictionaryColumnMetaChunk(dictionaryChunkMeta);
+ dictionaryMetaChunks.add(columnMetaChunk);
+ }
+ return dictionaryMetaChunks;
+ }
+
+ /**
+ * This method will be used to read only the last entry of dictionary meta chunk.
+ * Applicable scenarios :
+ * 1. Global dictionary generation for incremental load. In this case only the
+ * last dictionary chunk meta entry has to be read to calculate min, max surrogate
+ * key and start and end offset for the new dictionary chunk.
+ * 2. Truncate operation. While writing dictionary file in case of incremental load
+ * dictionary file needs to be validated for any inconsistency. Here end offset of last
+ * dictionary chunk meta is validated with file size.
+ *
+ * @return last segment entry for dictionary chunk
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public CarbonDictionaryColumnMetaChunk readLastEntryOfDictionaryMetaChunk()
+ throws IOException {
+ ColumnDictionaryChunkMeta dictionaryChunkMeta = null;
+ // open dictionary meta thrift reader
+ openThriftReader();
+ // at the completion of while loop we will get the last dictionary chunk entry
+ while (dictionaryMetadataFileReader.hasNext()) {
+ // get the thrift object for dictionary chunk
+ dictionaryChunkMeta = (ColumnDictionaryChunkMeta) dictionaryMetadataFileReader.read();
+ }
+ // create a new instance of chunk meta wrapper using thrift object
+ CarbonDictionaryColumnMetaChunk columnMetaChunkForLastSegment =
+ getNewInstanceOfCarbonDictionaryColumnMetaChunk(dictionaryChunkMeta);
+ return columnMetaChunkForLastSegment;
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public void close() throws IOException {
+ if (null != dictionaryMetadataFileReader) {
+ dictionaryMetadataFileReader.close();
+ dictionaryMetadataFileReader = null;
+ }
+ }
+
+ /**
+ * This method will form the path for dictionary metadata file for a given column
+ */
+ protected void initFileLocation() {
+ PathService pathService = CarbonCommonFactory.getPathService();
+ CarbonTablePath carbonTablePath =
+ pathService.getCarbonTablePath(columnIdentifier, this.hdfsStorePath, carbonTableIdentifier);
+ this.columnDictionaryMetadataFilePath =
+ carbonTablePath.getDictionaryMetaFilePath(columnIdentifier.getColumnId());
+ }
+
+ /**
+ * This method will open the dictionary file stream for reading
+ *
+ * @throws IOException thrift reader open method throws IOException
+ */
+ private void openThriftReader() throws IOException {
+ // initialise dictionary file reader which will return dictionary thrift object
+ // dictionary thrift object contains a list of byte buffer
+ if (null == dictionaryMetadataFileReader) {
+ dictionaryMetadataFileReader =
+ new ThriftReader(this.columnDictionaryMetadataFilePath, new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new ColumnDictionaryChunkMeta();
+ }
+ });
+ // Open it
+ dictionaryMetadataFileReader.open();
+ }
+
+ }
+
+ /**
+ * Given a thrift object thie method will create a new wrapper class object
+ * for dictionary chunk
+ *
+ * @param dictionaryChunkMeta reference for chunk meta thrift object
+ * @return wrapper object of dictionary chunk meta
+ */
+ private CarbonDictionaryColumnMetaChunk getNewInstanceOfCarbonDictionaryColumnMetaChunk(
+ ColumnDictionaryChunkMeta dictionaryChunkMeta) {
+ CarbonDictionaryColumnMetaChunk columnMetaChunk =
+ new CarbonDictionaryColumnMetaChunk(dictionaryChunkMeta.getMin_surrogate_key(),
+ dictionaryChunkMeta.getMax_surrogate_key(), dictionaryChunkMeta.getStart_offset(),
+ dictionaryChunkMeta.getEnd_offset(), dictionaryChunkMeta.getChunk_count());
+ return columnMetaChunk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReader.java
new file mode 100644
index 0000000..dded6c2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * dictionary reader interface which declares methods for
+ * reading carbon dictionary files
+ */
+public interface CarbonDictionaryReader extends Closeable {
+ /**
+ * This method should be used when complete dictionary data needs to be read.
+ * Applicable scenarios :
+ * 1. Global dictionary generation in case of incremental load
+ * 2. Reading dictionary file on first time query
+ * 3. Loading a dictionary column in memory based on query requirement.
+ * This is a case where carbon column cache feature is enabled in which a
+ * column dictionary is read if it is present in the query.
+ *
+ * @return list of byte array. Each byte array is unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ List<byte[]> read() throws IOException;
+
+ /**
+ * This method should be used when data has to be read from a given offset.
+ * Applicable scenarios :
+ * 1. Incremental data load. If column dictionary is already loaded in memory
+ * and incremental load is done, then for the new query only new dictionary data
+ * has to be read form memory.
+ *
+ * @param startOffset start offset of dictionary file
+ * @return list of byte array. Each byte array is unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ List<byte[]> read(long startOffset) throws IOException;
+
+ /**
+ * This method will be used to read data between given start and end offset.
+ * Applicable scenarios:
+ * 1. Truncate operation. If there is any inconsistency while writing the dictionary file
+ * then we can give the start and end offset till where the data has to be retained.
+ *
+ * @param startOffset start offset of dictionary file
+ * @param endOffset end offset of dictionary file
+ * @return list of byte array. Each byte array is unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ List<byte[]> read(long startOffset, long endOffset) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
new file mode 100644
index 0000000..a843701
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.format.ColumnDictionaryChunk;
+
+import org.apache.thrift.TBase;
+
+/**
+ * This class performs the functionality of reading a carbon dictionary file.
+ * It implements various overloaded method for read functionality.
+ */
+public class CarbonDictionaryReaderImpl implements CarbonDictionaryReader {
+
+ /**
+ * carbon table identifier
+ */
+ protected CarbonTableIdentifier carbonTableIdentifier;
+
+ /**
+ * HDFS store path
+ */
+ protected String hdfsStorePath;
+
+ /**
+ * column name
+ */
+ protected ColumnIdentifier columnIdentifier;
+
+ /**
+ * dictionary file path
+ */
+ protected String columnDictionaryFilePath;
+
+ /**
+ * dictionary thrift file reader
+ */
+ private ThriftReader dictionaryFileReader;
+
+ /**
+ * Constructor
+ *
+ * @param hdfsStorePath HDFS store path
+ * @param carbonTableIdentifier table identifier which will give table name and database name
+ * @param columnIdentifier column unique identifier
+ */
+ public CarbonDictionaryReaderImpl(String hdfsStorePath,
+ CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier) {
+ this.hdfsStorePath = hdfsStorePath;
+ this.carbonTableIdentifier = carbonTableIdentifier;
+ this.columnIdentifier = columnIdentifier;
+ initFileLocation();
+ }
+
+ /**
+ * This method should be used when complete dictionary data needs to be read.
+ * Applicable scenarios :
+ * 1. Global dictionary generation in case of incremental load
+ * 2. Reading dictionary file on first time query
+ * 3. Loading a dictionary column in memory based on query requirement.
+ * This is a case where carbon column cache feature is enabled in which a
+ * column dictionary is read if it is present in the query.
+ *
+ * @return list of byte array. Each byte array is unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public List<byte[]> read() throws IOException {
+ return read(0L);
+ }
+
+ /**
+ * This method should be used when data has to be read from a given offset.
+ * Applicable scenarios :
+ * 1. Incremental data load. If column dictionary is already loaded in memory
+ * and incremental load is done, then for the new query only new dictionary data
+ * has to be read form memory.
+ *
+ * @param startOffset start offset of dictionary file
+ * @return list of byte array. Each byte array is unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public List<byte[]> read(long startOffset) throws IOException {
+ List<CarbonDictionaryColumnMetaChunk> carbonDictionaryColumnMetaChunks =
+ readDictionaryMetadataFile();
+ // get the last entry for carbon dictionary meta chunk
+ CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk =
+ carbonDictionaryColumnMetaChunks.get(carbonDictionaryColumnMetaChunks.size() - 1);
+ // end offset till where the dictionary file has to be read
+ long endOffset = carbonDictionaryColumnMetaChunk.getEnd_offset();
+ return read(carbonDictionaryColumnMetaChunks, startOffset, endOffset);
+ }
+
+ /**
+ * This method will be used to read data between given start and end offset.
+ * Applicable scenarios:
+ * 1. Truncate operation. If there is any inconsistency while writing the dictionary file
+ * then we can give the start and end offset till where the data has to be retained.
+ *
+ * @param startOffset start offset of dictionary file
+ * @param endOffset end offset of dictionary file
+ * @return list of byte array. Each byte array is unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public List<byte[]> read(long startOffset, long endOffset) throws IOException {
+ List<CarbonDictionaryColumnMetaChunk> carbonDictionaryColumnMetaChunks =
+ readDictionaryMetadataFile();
+ return read(carbonDictionaryColumnMetaChunks, startOffset, endOffset);
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public void close() throws IOException {
+ if (null != dictionaryFileReader) {
+ dictionaryFileReader.close();
+ dictionaryFileReader = null;
+ }
+ }
+
+ /**
+ * @param carbonDictionaryColumnMetaChunks dictionary meta chunk list
+ * @param startOffset start offset for dictionary data file
+ * @param endOffset end offset till where data has
+ * to be read from dictionary data file
+ * @return list of byte array dictionary values
+ * @throws IOException readDictionary file method throws IO exception
+ */
+ private List<byte[]> read(List<CarbonDictionaryColumnMetaChunk> carbonDictionaryColumnMetaChunks,
+ long startOffset, long endOffset) throws IOException {
+ // calculate the number of chunks to be read from dictionary file from start offset
+ int dictionaryChunkCountsToBeRead =
+ calculateTotalDictionaryChunkCountsToBeRead(carbonDictionaryColumnMetaChunks, startOffset,
+ endOffset);
+ // open dictionary file thrift reader
+ openThriftReader();
+ // read the required number of chunks from dictionary file
+ List<ColumnDictionaryChunk> columnDictionaryChunks =
+ readDictionaryFile(startOffset, dictionaryChunkCountsToBeRead);
+ // convert byte buffer list to byte array list of dictionary vlaues
+ List<byte[]> dictionaryValues =
+ new ArrayList<byte[]>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (ColumnDictionaryChunk dictionaryChunk : columnDictionaryChunks) {
+ convertAndFillByteBufferListToByteArrayList(dictionaryValues, dictionaryChunk.getValues());
+ }
+ return dictionaryValues;
+ }
+
+ /**
+ * This method will convert and fill list of byte buffer to list of byte array
+ *
+ * @param dictionaryValues list of byte array. Each byte array is
+ * unique dictionary value
+ * @param dictionaryValueBufferList dictionary thrift object which is a list of byte buffer.
+ * Each dictionary value is a wrapped in byte buffer before
+ * writing to file
+ */
+ private void convertAndFillByteBufferListToByteArrayList(List<byte[]> dictionaryValues,
+ List<ByteBuffer> dictionaryValueBufferList) {
+ for (ByteBuffer buffer : dictionaryValueBufferList) {
+ int length = buffer.limit();
+ byte[] value = new byte[length];
+ buffer.get(value, 0, value.length);
+ dictionaryValues.add(value);
+ }
+ }
+
+ /**
+ * This method will form the path for dictionary file for a given column
+ */
+ protected void initFileLocation() {
+ PathService pathService = CarbonCommonFactory.getPathService();
+ CarbonTablePath carbonTablePath = pathService.getCarbonTablePath(columnIdentifier,
+ this.hdfsStorePath, carbonTableIdentifier);
+ this.columnDictionaryFilePath = carbonTablePath
+ .getDictionaryFilePath(columnIdentifier.getColumnId());
+ }
+
+ /**
+ * This method will read the dictionary file and return the list of dictionary thrift object
+ *
+ * @param dictionaryStartOffset start offset for dictionary file
+ * @param dictionaryChunkCountToBeRead number of dictionary chunks to be read
+ * @return list of dictionary chunks
+ * @throws IOException setReadOffset method throws I/O exception
+ */
+ private List<ColumnDictionaryChunk> readDictionaryFile(long dictionaryStartOffset,
+ int dictionaryChunkCountToBeRead) throws IOException {
+ List<ColumnDictionaryChunk> dictionaryChunks =
+ new ArrayList<ColumnDictionaryChunk>(dictionaryChunkCountToBeRead);
+ // skip the number of bytes if a start offset is given
+ dictionaryFileReader.setReadOffset(dictionaryStartOffset);
+ // read till dictionary chunk count
+ while (dictionaryFileReader.hasNext()
+ && dictionaryChunks.size() != dictionaryChunkCountToBeRead) {
+ dictionaryChunks.add((ColumnDictionaryChunk) dictionaryFileReader.read());
+ }
+ return dictionaryChunks;
+ }
+
+ /**
+ * This method will read the dictionary metadata file for a given column
+ * and calculate the number of chunks to be read from the dictionary file.
+ * It will do a strict validation for start and end offset as if the offsets are not
+ * exactly matching, because data is written in thrift format, the thrift object
+ * will not be retrieved properly
+ *
+ * @param dictionaryChunkMetaList list of dictionary chunk metadata
+ * @param dictionaryChunkStartOffset start offset for a dictionary chunk
+ * @param dictionaryChunkEndOffset end offset for a dictionary chunk
+ * @return
+ */
+ private int calculateTotalDictionaryChunkCountsToBeRead(
+ List<CarbonDictionaryColumnMetaChunk> dictionaryChunkMetaList,
+ long dictionaryChunkStartOffset, long dictionaryChunkEndOffset) {
+ boolean chunkWithStartOffsetFound = false;
+ int dictionaryChunkCount = 0;
+ for (CarbonDictionaryColumnMetaChunk metaChunk : dictionaryChunkMetaList) {
+ // find the column meta chunk whose start offset value matches
+ // with the given dictionary start offset
+ if (!chunkWithStartOffsetFound && dictionaryChunkStartOffset == metaChunk.getStart_offset()) {
+ chunkWithStartOffsetFound = true;
+ }
+ // start offset is found then keep adding the chunk count to be read
+ if (chunkWithStartOffsetFound) {
+ dictionaryChunkCount = dictionaryChunkCount + metaChunk.getChunk_count();
+ }
+ // when end offset is reached then break the loop
+ if (dictionaryChunkEndOffset == metaChunk.getEnd_offset()) {
+ break;
+ }
+ }
+ return dictionaryChunkCount;
+ }
+
+ /**
+ * This method will read dictionary metadata file and return the dictionary meta chunks
+ *
+ * @return list of dictionary metadata chunks
+ * @throws IOException read and close method throws IO exception
+ */
+ private List<CarbonDictionaryColumnMetaChunk> readDictionaryMetadataFile() throws IOException {
+ CarbonDictionaryMetadataReader columnMetadataReaderImpl = getDictionaryMetadataReader();
+ List<CarbonDictionaryColumnMetaChunk> dictionaryMetaChunkList = null;
+ // read metadata file
+ try {
+ dictionaryMetaChunkList = columnMetadataReaderImpl.read();
+ } finally {
+ // close the metadata reader
+ columnMetadataReaderImpl.close();
+ }
+ return dictionaryMetaChunkList;
+ }
+
+ /**
+ * @return
+ */
+ protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() {
+ return new CarbonDictionaryMetadataReaderImpl(this.hdfsStorePath, carbonTableIdentifier,
+ this.columnIdentifier);
+ }
+
+ /**
+ * This method will open the dictionary file stream for reading
+ *
+ * @throws IOException thrift reader open method throws IOException
+ */
+ private void openThriftReader() throws IOException {
+ if (null == dictionaryFileReader) {
+ // initialise dictionary file reader which will return dictionary thrift object
+ // dictionary thrift object contains a list of byte buffer
+ dictionaryFileReader =
+ new ThriftReader(this.columnDictionaryFilePath, new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new ColumnDictionaryChunk();
+ }
+ });
+ // Open dictionary file reader
+ dictionaryFileReader.open();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReader.java
new file mode 100644
index 0000000..b9c3ae1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.IOException;
+
+import org.apache.carbondata.format.FileFooter;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reads the metadata from fact file in org.apache.carbondata.format.FileFooter thrift object
+ */
+public class CarbonFooterReader {
+
+ //Fact file path
+ private String filePath;
+
+ //From which offset of file this metadata should be read
+ private long offset;
+
+ public CarbonFooterReader(String filePath, long offset) {
+
+ this.filePath = filePath;
+ this.offset = offset;
+ }
+
+ /**
+ * It reads the metadata in FileFooter thrift object format.
+ *
+ * @return
+ * @throws IOException
+ */
+ public FileFooter readFooter() throws IOException {
+ ThriftReader thriftReader = openThriftReader(filePath);
+ thriftReader.open();
+ //Set the offset from where it should read
+ thriftReader.setReadOffset(offset);
+ FileFooter footer = (FileFooter) thriftReader.read();
+ thriftReader.close();
+ return footer;
+ }
+
+ /**
+ * Open the thrift reader
+ *
+ * @param filePath
+ * @return
+ * @throws IOException
+ */
+ private ThriftReader openThriftReader(String filePath) throws IOException {
+
+ ThriftReader thriftReader = new ThriftReader(filePath, new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new FileFooter();
+ }
+ });
+ return thriftReader;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
new file mode 100644
index 0000000..7f9a984
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.reader;
+
+import java.io.IOException;
+
+import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.IndexHeader;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reader class which will be used to read the index file
+ */
+public class CarbonIndexFileReader {
+
+ /**
+ * reader
+ */
+ private ThriftReader thriftReader;
+
+ /**
+ * Below method will be used to read the index header
+ *
+ * @return index header
+ * @throws IOException if any problem while reader the header
+ */
+ public IndexHeader readIndexHeader() throws IOException {
+ IndexHeader indexHeader = (IndexHeader) thriftReader.read(new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new IndexHeader();
+ }
+ });
+ return indexHeader;
+ }
+
+ /**
+ * Below method will be used to close the reader
+ */
+ public void closeThriftReader() {
+ thriftReader.close();
+ }
+
+ /**
+ * Below method will be used to read the block index from fie
+ *
+ * @return block index info
+ * @throws IOException if problem while reading the block index
+ */
+ public BlockIndex readBlockIndexInfo() throws IOException {
+ BlockIndex blockInfo = (BlockIndex) thriftReader.read(new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new BlockIndex();
+ }
+ });
+ return blockInfo;
+ }
+
+ /**
+ * Open the thrift reader
+ *
+ * @param filePath
+ * @throws IOException
+ */
+ public void openThriftReader(String filePath) throws IOException {
+ thriftReader = new ThriftReader(filePath);
+ thriftReader.open();
+ }
+
+ /**
+ * check if any more object is present
+ *
+ * @return true if any more object can be read
+ * @throws IOException
+ */
+ public boolean hasNext() throws IOException {
+ return thriftReader.hasNext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
new file mode 100644
index 0000000..0958349
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+
+/**
+ * A simple class for reading Thrift objects (of a single type) from a fileName.
+ */
+public class ThriftReader {
+ /**
+ * buffer size
+ */
+ private static final int bufferSize = 2048;
+ /**
+ * File containing the objects.
+ */
+ private String fileName;
+ /**
+ * Used to create empty objects that will be initialized with values from the fileName.
+ */
+ private TBaseCreator creator;
+ /**
+ * For reading the fileName.
+ */
+ private DataInputStream dataInputStream;
+ /**
+ * For reading the binary thrift objects.
+ */
+ private TProtocol binaryIn;
+
+ /**
+ * Constructor.
+ */
+ public ThriftReader(String fileName, TBaseCreator creator) {
+ this.fileName = fileName;
+ this.creator = creator;
+ }
+
+ /**
+ * Constructor.
+ */
+ public ThriftReader(String fileName) {
+ this.fileName = fileName;
+ }
+
+ /**
+ * Opens the fileName for reading.
+ */
+ public void open() throws IOException {
+ FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+ dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize);
+ binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
+ }
+
+ /**
+ * This method will set the position of stream from where data has to be read
+ */
+ public void setReadOffset(long bytesToSkip) throws IOException {
+ if (dataInputStream.skip(bytesToSkip) != bytesToSkip) {
+ throw new IOException("It doesn't set the offset properly");
+ }
+ }
+
+ /**
+ * Checks if another objects is available by attempting to read another byte from the stream.
+ */
+ public boolean hasNext() throws IOException {
+ dataInputStream.mark(1);
+ int val = dataInputStream.read();
+ dataInputStream.reset();
+ return val != -1;
+ }
+
+ /**
+ * Reads the next object from the fileName.
+ */
+ public TBase read() throws IOException {
+ TBase t = creator.create();
+ try {
+ t.read(binaryIn);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ return t;
+ }
+
+ /**
+ * Reads the next object from the fileName.
+ *
+ * @param creator type of object which will be returned
+ * @throws IOException any problem while reading
+ */
+ public TBase read(TBaseCreator creator) throws IOException {
+ TBase t = creator.create();
+ try {
+ t.read(binaryIn);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ return t;
+ }
+
+ /**
+ * Close the fileName.
+ */
+ public void close() {
+ CarbonUtil.closeStreams(dataInputStream);
+ }
+
+ /**
+ * Thrift deserializes by taking an existing object and populating it. ThriftReader
+ * needs a way of obtaining instances of the class to be populated and this interface
+ * defines the mechanism by which a client provides these instances.
+ */
+ public static interface TBaseCreator {
+ TBase create();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReader.java b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReader.java
new file mode 100644
index 0000000..e0bb413
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.reader.sortindex;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for reading the dictionary sort index and sort index inverted
+ */
+public interface CarbonDictionarySortIndexReader extends Closeable {
+
+ /**
+ * method for reading the carbon dictionary sort index data
+ * from columns sortIndex file.
+ *
+ * @return The method return's the list of dictionary sort Index and sort Index reverse
+ * @throws IOException In case any I/O error occurs
+ */
+ public List<Integer> readSortIndex() throws IOException;
+
+ /**
+ * method for reading the carbon dictionary inverted sort index data
+ * from columns sortIndex file.
+ *
+ * @return The method return's the list of dictionary inverted sort Index
+ * @throws IOException In case any I/O error occurs
+ */
+ public List<Integer> readInvertedSortIndex() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
new file mode 100644
index 0000000..70628b3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.reader.sortindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.format.ColumnSortInfo;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Implementation for reading the dictionary sort index and inverted sort index .
+ */
+public class CarbonDictionarySortIndexReaderImpl implements CarbonDictionarySortIndexReader {
+
+ /**
+ * carbonTable Identifier holding the info of databaseName and tableName
+ */
+ protected CarbonTableIdentifier carbonTableIdentifier;
+
+ /**
+ * column name
+ */
+ protected ColumnIdentifier columnIdentifier;
+
+ /**
+ * hdfs store location
+ */
+ protected String carbonStorePath;
+
+ /**
+ * the path of the dictionary Sort Index file
+ */
+ protected String sortIndexFilePath;
+
+ /**
+ * Column sort info thrift instance.
+ */
+ ColumnSortInfo columnSortInfo = null;
+
+ /**
+ * Comment for <code>LOGGER</code>
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonDictionarySortIndexReaderImpl.class.getName());
+
+ /**
+ * dictionary sortIndex file Reader
+ */
+ private ThriftReader dictionarySortIndexThriftReader;
+
+ /**
+ * @param carbonTableIdentifier Carbon Table identifier holding the database name and table name
+ * @param columnIdentifier column name
+ * @param carbonStorePath carbon store path
+ */
+ public CarbonDictionarySortIndexReaderImpl(final CarbonTableIdentifier carbonTableIdentifier,
+ final ColumnIdentifier columnIdentifier, final String carbonStorePath) {
+ this.carbonTableIdentifier = carbonTableIdentifier;
+ this.columnIdentifier = columnIdentifier;
+ this.carbonStorePath = carbonStorePath;
+ }
+
+ /**
+ * method for reading the carbon dictionary sort index data
+ * from columns sortIndex file.
+ *
+ * @return The method return's the list of dictionary sort Index and sort Index reverse
+ * In case of no member for column empty list will be return
+ * @throws IOException In case any I/O error occurs
+ */
+ @Override public List<Integer> readSortIndex() throws IOException {
+ if (null == columnSortInfo) {
+ readColumnSortInfo();
+ }
+ return columnSortInfo.getSort_index();
+ }
+
+ /**
+ * method for reading the carbon dictionary sort index data
+ * from columns sortIndex file.
+ * In case of no member empty list will be return
+ *
+ * @throws IOException In case any I/O error occurs
+ */
+ private void readColumnSortInfo() throws IOException {
+ init();
+ try {
+ columnSortInfo = (ColumnSortInfo) dictionarySortIndexThriftReader.read();
+ } catch (IOException ie) {
+ LOGGER.error(ie, "problem while reading the column sort info.");
+ throw new IOException("problem while reading the column sort info.", ie);
+ } finally {
+ if (null != dictionarySortIndexThriftReader) {
+ dictionarySortIndexThriftReader.close();
+ }
+ }
+ }
+
+ /**
+ * method for reading the carbon dictionary inverted sort index data
+ * from columns sortIndex file.
+ *
+ * @return The method return's the list of dictionary inverted sort Index
+ * @throws IOException In case any I/O error occurs
+ */
+ @Override public List<Integer> readInvertedSortIndex() throws IOException {
+ if (null == columnSortInfo) {
+ readColumnSortInfo();
+ }
+ return columnSortInfo.getSort_index_inverted();
+ }
+
+ /**
+ * The method initializes the dictionary Sort Index file path
+ * and initialize and opens the thrift reader for dictionary sortIndex file.
+ *
+ * @throws IOException if any I/O errors occurs
+ */
+ private void init() throws IOException {
+ initPath();
+ openThriftReader();
+ }
+
+ protected void initPath() {
+ PathService pathService = CarbonCommonFactory.getPathService();
+ CarbonTablePath carbonTablePath =
+ pathService.getCarbonTablePath(columnIdentifier, carbonStorePath, carbonTableIdentifier);
+ try {
+ CarbonDictionaryColumnMetaChunk chunkMetaObjectForLastSegmentEntry =
+ getChunkMetaObjectForLastSegmentEntry();
+ long dictOffset = chunkMetaObjectForLastSegmentEntry.getEnd_offset();
+ this.sortIndexFilePath =
+ carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset);
+ if (!FileFactory
+ .isFileExist(this.sortIndexFilePath, FileFactory.getFileType(this.sortIndexFilePath))) {
+ this.sortIndexFilePath =
+ carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId());
+ }
+ } catch (IOException e) {
+ this.sortIndexFilePath = carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId());
+ }
+
+ }
+
+ /**
+ * This method will read the dictionary chunk metadata thrift object for last entry
+ *
+ * @return last entry of dictionary meta chunk
+ * @throws IOException if an I/O error occurs
+ */
+ private CarbonDictionaryColumnMetaChunk getChunkMetaObjectForLastSegmentEntry()
+ throws IOException {
+ CarbonDictionaryMetadataReader columnMetadataReaderImpl = getDictionaryMetadataReader();
+ try {
+ // read the last segment entry for dictionary metadata
+ return columnMetadataReaderImpl.readLastEntryOfDictionaryMetaChunk();
+ } finally {
+ // Close metadata reader
+ columnMetadataReaderImpl.close();
+ }
+ }
+
+ /**
+ * @return
+ */
+ protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() {
+ return new CarbonDictionaryMetadataReaderImpl(carbonStorePath, carbonTableIdentifier,
+ columnIdentifier);
+ }
+
+ /**
+ * This method will open the dictionary sort index file stream for reading
+ *
+ * @throws IOException in case any I/O errors occurs
+ */
+ private void openThriftReader() throws IOException {
+ this.dictionarySortIndexThriftReader =
+ new ThriftReader(this.sortIndexFilePath, new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new ColumnSortInfo();
+ }
+ });
+ dictionarySortIndexThriftReader.open();
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public void close() throws IOException {
+ if (null != dictionarySortIndexThriftReader) {
+ dictionarySortIndexThriftReader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java b/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
new file mode 100644
index 0000000..1c97082
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * Column Unique id generator
+ */
+public interface ColumnUniqueIdService {
+
+ /**
+ * @param databaseName
+ * @param columnSchema
+ * @return generate unique id
+ */
+ public String generateUniqueId(String databaseName, ColumnSchema columnSchema);
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java b/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
new file mode 100644
index 0000000..9b9ade6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/DictionaryService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service;
+
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryReader;
+import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
+
+/**
+ * Dictionary service to get writer and reader
+ */
+public interface DictionaryService {
+
+ /**
+ * get dictionary writer
+ *
+ * @param carbonTableIdentifier
+ * @param columnIdentifier
+ * @param carbonStorePath
+ * @return
+ */
+ public CarbonDictionaryWriter getDictionaryWriter(CarbonTableIdentifier carbonTableIdentifier,
+ ColumnIdentifier columnIdentifier, String carbonStorePath);
+
+ /**
+ * get dictionary sort index writer
+ *
+ * @param carbonTableIdentifier
+ * @param columnIdentifier
+ * @param carbonStorePath
+ * @return
+ */
+ public CarbonDictionarySortIndexWriter getDictionarySortIndexWriter(
+ CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+ String carbonStorePath);
+
+ /**
+ * get dictionary metadata reader
+ *
+ * @param carbonTableIdentifier
+ * @param columnIdentifier
+ * @param carbonStorePath
+ * @return
+ */
+ public CarbonDictionaryMetadataReader getDictionaryMetadataReader(
+ CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+ String carbonStorePath);
+
+ /**
+ * get dictionary reader
+ *
+ * @param carbonTableIdentifier
+ * @param columnIdentifier
+ * @param carbonStorePath
+ * @return
+ */
+ public CarbonDictionaryReader getDictionaryReader(CarbonTableIdentifier carbonTableIdentifier,
+ ColumnIdentifier columnIdentifier, String carbonStorePath);
+
+ /**
+ * get dictionary sort index reader
+ *
+ * @param carbonTableIdentifier
+ * @param columnIdentifier
+ * @param carbonStorePath
+ * @return
+ */
+ public CarbonDictionarySortIndexReader getDictionarySortIndexReader(
+ CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier,
+ String carbonStorePath);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/service/PathService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/PathService.java b/core/src/main/java/org/apache/carbondata/core/service/PathService.java
new file mode 100644
index 0000000..d3295f5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/service/PathService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service;
+
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+
+/**
+ * Create helper to get path details
+ */
+public interface PathService {
+
+ /**
+ * @param columnIdentifier
+ * @param storeLocation
+ * @param tableIdentifier
+ * @return store path related to tables
+ */
+ CarbonTablePath getCarbonTablePath(ColumnIdentifier columnIdentifier, String storeLocation,
+ CarbonTableIdentifier tableIdentifier);
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
new file mode 100644
index 0000000..2f91d1e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Util class for byte comparision
+ */
+public final class ByteUtil {
+
+ private static final int SIZEOF_LONG = 8;
+
+ private ByteUtil() {
+
+ }
+
+ /**
+ * Compare method for bytes
+ *
+ * @param buffer1
+ * @param buffer2
+ * @return
+ */
+ public static int compare(byte[] buffer1, byte[] buffer2) {
+ // Short circuit equal case
+ if (buffer1 == buffer2) {
+ return 0;
+ }
+ // Bring WritableComparator code local
+ int i = 0;
+ int j = 0;
+ for (; i < buffer1.length && j < buffer2.length; i++, j++) {
+ int a = (buffer1[i] & 0xff);
+ int b = (buffer2[j] & 0xff);
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * covert the long[] to int[]
+ *
+ * @param longArray
+ * @return
+ */
+ public static int[] convertToIntArray(long[] longArray) {
+ int[] intArray = new int[longArray.length];
+ for (int i = 0; i < longArray.length; i++) {
+ intArray[i] = (int) longArray[i];
+
+ }
+ return intArray;
+ }
+
+ /**
+ * Unsafe comparator
+ */
+ public enum UnsafeComparer {
+ /**
+ * instance.
+ */
+ INSTANCE;
+
+ /**
+ * unsafe .
+ */
+ static final sun.misc.Unsafe THEUNSAFE;
+
+ /**
+ * The offset to the first element in a byte array.
+ */
+ static final int BYTE_ARRAY_BASE_OFFSET;
+ static final boolean LITTLEENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+ static {
+ THEUNSAFE = (sun.misc.Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() {
+ @Override public Object run() {
+ try {
+ Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return f.get(null);
+ } catch (NoSuchFieldException e) {
+ // It doesn't matter what we throw;
+ // it's swallowed in getBestComparer().
+ throw new Error();
+ } catch (IllegalAccessException e) {
+ throw new Error();
+ }
+ }
+ });
+
+ BYTE_ARRAY_BASE_OFFSET = THEUNSAFE.arrayBaseOffset(byte[].class);
+
+ // sanity check - this should never fail
+ if (THEUNSAFE.arrayIndexScale(byte[].class) != 1) {
+ throw new AssertionError();
+ }
+
+ }
+
+ /**
+ * Returns true if x1 is less than x2, when both values are treated as
+ * unsigned.
+ */
+ static boolean lessThanUnsigned(long x1, long x2) {
+ return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE);
+ }
+
+ /**
+ * Lexicographically compare two arrays.
+ *
+ * @param buffer1 left operand
+ * @param buffer2 right operand
+ * @param offset1 Where to start comparing in the left buffer
+ * @param offset2 Where to start comparing in the right buffer
+ * @param length1 How much to compare from the left buffer
+ * @param length2 How much to compare from the right buffer
+ * @return 0 if equal, < 0 if left is less than right, etc.
+ */
+ public int compareTo(byte[] buffer1, int offset1, int length1, byte[] buffer2, int offset2,
+ int length2) {
+ // Short circuit equal case
+ if (buffer1 == buffer2 && offset1 == offset2 && length1 == length2) {
+ return 0;
+ }
+ int minLength = Math.min(length1, length2);
+ int minWords = minLength / SIZEOF_LONG;
+ int offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET;
+ int offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET;
+
+ /*
+ * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes
+ * at a time is no slower than comparing 4 bytes at a time even on
+ * 32-bit. On the other hand, it is substantially faster on 64-bit.
+ */
+ for (int i = 0; i < minWords * SIZEOF_LONG; i += SIZEOF_LONG) {
+ long lw = THEUNSAFE.getLong(buffer1, offset1Adj + (long) i);
+ long rw = THEUNSAFE.getLong(buffer2, offset2Adj + (long) i);
+ long diff = lw ^ rw;
+
+ if (diff != 0) {
+ if (!LITTLEENDIAN) {
+ return lessThanUnsigned(lw, rw) ? -1 : 1;
+ }
+
+ // Use binary search
+ int n = 0;
+ int y;
+ int x = (int) diff;
+ if (x == 0) {
+ x = (int) (diff >>> 32);
+ n = 32;
+ }
+
+ y = x << 16;
+ if (y == 0) {
+ n += 16;
+ } else {
+ x = y;
+ }
+
+ y = x << 8;
+ if (y == 0) {
+ n += 8;
+ }
+ return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+ }
+ }
+
+ // The epilogue to cover the last (minLength % 8) elements.
+ for (int i = minWords * SIZEOF_LONG; i < minLength; i++) {
+ int a = (buffer1[offset1 + i] & 0xff);
+ int b = (buffer2[offset2 + i] & 0xff);
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return length1 - length2;
+ }
+
+ public int compareTo(byte[] buffer1, byte[] buffer2) {
+
+ // Short circuit equal case
+ if (buffer1 == buffer2) {
+ return 0;
+ }
+ int len1 = buffer1.length;
+ int len2 = buffer2.length;
+ int minLength = (len1 <= len2) ? len1 : len2;
+ int minWords = 0;
+
+ /*
+ * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes
+ * at a time is no slower than comparing 4 bytes at a time even on
+ * 32-bit. On the other hand, it is substantially faster on 64-bit.
+ */
+ if (minLength > 7) {
+ minWords = minLength / SIZEOF_LONG;
+ for (int i = 0; i < minWords * SIZEOF_LONG; i += SIZEOF_LONG) {
+ long lw = THEUNSAFE.getLong(buffer1, BYTE_ARRAY_BASE_OFFSET + (long) i);
+ long rw = THEUNSAFE.getLong(buffer2, BYTE_ARRAY_BASE_OFFSET + (long) i);
+ long diff = lw ^ rw;
+
+ if (diff != 0) {
+ if (!LITTLEENDIAN) {
+ return lessThanUnsigned(lw, rw) ? -1 : 1;
+ }
+
+ // Use binary search
+ int k = 0;
+ int y;
+ int x = (int) diff;
+ if (x == 0) {
+ x = (int) (diff >>> 32);
+ k = 32;
+ }
+ y = x << 16;
+ if (y == 0) {
+ k += 16;
+ } else {
+ x = y;
+ }
+
+ y = x << 8;
+ if (y == 0) {
+ k += 8;
+ }
+ return (int) (((lw >>> k) & 0xFFL) - ((rw >>> k) & 0xFFL));
+ }
+ }
+ }
+
+ // The epilogue to cover the last (minLength % 8) elements.
+ for (int i = minWords * SIZEOF_LONG; i < minLength; i++) {
+ int a = (buffer1[i] & 0xff);
+ int b = (buffer2[i] & 0xff);
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return len1 - len2;
+ }
+
+ public boolean equals(byte[] buffer1, byte[] buffer2) {
+ if (buffer1.length != buffer2.length) {
+ return false;
+ }
+ int len = buffer1.length / 8;
+ long currentOffset = BYTE_ARRAY_BASE_OFFSET;
+ for (int i = 0; i < len; i++) {
+ long lw = THEUNSAFE.getLong(buffer1, currentOffset);
+ long rw = THEUNSAFE.getLong(buffer2, currentOffset);
+ if (lw != rw) {
+ return false;
+ }
+ currentOffset += 8;
+ }
+ len = buffer1.length % 8;
+ if (len > 0) {
+ for (int i = 0; i < len; i += 1) {
+ long lw = THEUNSAFE.getByte(buffer1, currentOffset);
+ long rw = THEUNSAFE.getByte(buffer2, currentOffset);
+ if (lw != rw) {
+ return false;
+ }
+ currentOffset += 1;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Comparing the 2 byte buffers. This is used in case of data load sorting step.
+ *
+ * @param byteBuffer1
+ * @param byteBuffer2
+ * @return
+ */
+ public int compareTo(ByteBuffer byteBuffer1, ByteBuffer byteBuffer2) {
+
+ // Short circuit equal case
+ if (byteBuffer1 == byteBuffer2) {
+ return 0;
+ }
+ int len1 = byteBuffer1.remaining();
+ int len2 = byteBuffer2.remaining();
+ byte[] buffer1 = new byte[len1];
+ byte[] buffer2 = new byte[len2];
+ byteBuffer1.get(buffer1);
+ byteBuffer2.get(buffer2);
+ return compareTo(buffer1, buffer2);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonFileFolderComparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonFileFolderComparator.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonFileFolderComparator.java
new file mode 100644
index 0000000..c60865d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonFileFolderComparator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+
+public class CarbonFileFolderComparator implements Comparator<CarbonFile> {
+
+ /**
+ * Below method will be used to compare two file
+ *
+ * @param o1 first file
+ * @param o2 Second file
+ * @return compare result
+ */
+ @Override public int compare(CarbonFile o1, CarbonFile o2) {
+ String firstFileName = o1.getName();
+ String secondFileName = o2.getName();
+ int lastIndexOfO1 = firstFileName.lastIndexOf('_');
+ int lastIndexOfO2 = secondFileName.lastIndexOf('_');
+ int file1 = 0;
+ int file2 = 0;
+
+ try {
+ file1 = Integer.parseInt(firstFileName.substring(lastIndexOfO1 + 1));
+ file2 = Integer.parseInt(secondFileName.substring(lastIndexOfO2 + 1));
+ } catch (NumberFormatException e) {
+ return -1;
+ }
+ return (file1 < file2) ? -1 : (file1 == file2 ? 0 : 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsDummy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsDummy.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsDummy.java
new file mode 100644
index 0000000..ac504f0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsDummy.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+public class CarbonLoadStatisticsDummy implements LoadStatistics {
+ private CarbonLoadStatisticsDummy() {
+
+ }
+
+ private static CarbonLoadStatisticsDummy carbonLoadStatisticsDummyInstance =
+ new CarbonLoadStatisticsDummy();
+
+ public static CarbonLoadStatisticsDummy getInstance() {
+ return carbonLoadStatisticsDummyInstance;
+ }
+
+ @Override
+ public void initPartitonInfo(String PartitionId) {
+
+ }
+
+ @Override
+ public void recordDicShuffleAndWriteTime() {
+
+ }
+
+ @Override
+ public void recordLoadCsvfilesToDfTime() {
+
+ }
+
+ @Override
+ public void recordDictionaryValuesTotalTime(String partitionID,
+ Long dictionaryValuesTotalTimeTimePoint) {
+
+ }
+
+ @Override
+ public void recordCsvInputStepTime(String partitionID, Long csvInputStepTimePoint) {
+
+ }
+
+ @Override
+ public void recordLruCacheLoadTime(double lruCacheLoadTime) {
+
+ }
+
+ @Override
+ public void recordGeneratingDictionaryValuesTime(String partitionID,
+ Long generatingDictionaryValuesTimePoint) {
+
+ }
+
+ @Override
+ public void recordSortRowsStepTotalTime(String partitionID, Long sortRowsStepTotalTimePoint) {
+
+ }
+
+ @Override
+ public void recordMdkGenerateTotalTime(String partitionID, Long mdkGenerateTotalTimePoint) {
+
+ }
+
+ @Override
+ public void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
+ Long dictionaryValue2MdkAdd2FileTimePoint) {
+
+ }
+
+ @Override
+ public void recordTotalRecords(long totalRecords) {
+
+ }
+
+ @Override
+ public void recordHostBlockMap(String host, Integer numBlocks) {
+
+ }
+
+ @Override
+ public void recordPartitionBlockMap(String partitionID, Integer numBlocks) {
+
+ }
+
+ @Override
+ public void printStatisticsInfo(String partitionID) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java
new file mode 100644
index 0000000..c9fc8ba
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * A util which provide methods used to record time information druing data loading.
+ */
+public class CarbonLoadStatisticsImpl implements LoadStatistics {
+ private CarbonLoadStatisticsImpl() {
+
+ }
+
+ private static CarbonLoadStatisticsImpl carbonLoadStatisticsImplInstance =
+ new CarbonLoadStatisticsImpl();
+
+ public static CarbonLoadStatisticsImpl getInstance() {
+ return carbonLoadStatisticsImplInstance;
+ }
+
+ private final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonLoadStatisticsImpl.class.getName());
+
+ /*
+ *We only care about the earliest start time(EST) and the latest end time(LET) of different
+ *threads, who does the same thing, LET - EST is the cost time of doing one thing using
+ *multiple thread.
+ */
+ private long loadCsvfilesToDfStartTime = 0;
+ private long loadCsvfilesToDfCostTime = 0;
+ private long dicShuffleAndWriteFileTotalStartTime = 0;
+ private long dicShuffleAndWriteFileTotalCostTime = 0;
+
+ //LRU cache load one time
+ private double lruCacheLoadTime = 0;
+
+ //Generate surrogate keys total time for each partition:
+ private ConcurrentHashMap<String, Long[]> parDictionaryValuesTotalTimeMap =
+ new ConcurrentHashMap<String, Long[]>();
+ private ConcurrentHashMap<String, Long[]> parCsvInputStepTimeMap =
+ new ConcurrentHashMap<String, Long[]>();
+ private ConcurrentHashMap<String, Long[]> parGeneratingDictionaryValuesTimeMap =
+ new ConcurrentHashMap<String, Long[]>();
+
+ //Sort rows step total time for each partition:
+ private ConcurrentHashMap<String, Long[]> parSortRowsStepTotalTimeMap =
+ new ConcurrentHashMap<String, Long[]>();
+
+ //MDK generate total time for each partition:
+ private ConcurrentHashMap<String, Long[]> parMdkGenerateTotalTimeMap =
+ new ConcurrentHashMap<String, Long[]>();
+ private ConcurrentHashMap<String, Long[]> parDictionaryValue2MdkAdd2FileTime =
+ new ConcurrentHashMap<String, Long[]>();
+
+ //Node block process information
+ private ConcurrentHashMap<String, Integer> hostBlockMap =
+ new ConcurrentHashMap<String, Integer>();
+
+ //Partition block process information
+ private ConcurrentHashMap<String, Integer> partitionBlockMap =
+ new ConcurrentHashMap<String, Integer>();
+
+ private long totalRecords = 0;
+ private double totalTime = 0;
+
+ @Override
+ public void initPartitonInfo(String PartitionId) {
+ parDictionaryValuesTotalTimeMap.put(PartitionId, new Long[2]);
+ parCsvInputStepTimeMap.put(PartitionId, new Long[2]);
+ parSortRowsStepTotalTimeMap.put(PartitionId, new Long[2]);
+ parGeneratingDictionaryValuesTimeMap.put(PartitionId, new Long[2]);
+ parMdkGenerateTotalTimeMap.put(PartitionId, new Long[2]);
+ parDictionaryValue2MdkAdd2FileTime.put(PartitionId, new Long[2]);
+ }
+
+ //Record the time
+ public void recordDicShuffleAndWriteTime() {
+ Long dicShuffleAndWriteTimePoint = System.currentTimeMillis();
+ if (0 == dicShuffleAndWriteFileTotalStartTime) {
+ dicShuffleAndWriteFileTotalStartTime = dicShuffleAndWriteTimePoint;
+ }
+ if (dicShuffleAndWriteTimePoint - dicShuffleAndWriteFileTotalStartTime >
+ dicShuffleAndWriteFileTotalCostTime) {
+ dicShuffleAndWriteFileTotalCostTime =
+ dicShuffleAndWriteTimePoint - dicShuffleAndWriteFileTotalStartTime;
+ }
+ }
+
+ public void recordLoadCsvfilesToDfTime() {
+ Long loadCsvfilesToDfTimePoint = System.currentTimeMillis();
+ if (0 == loadCsvfilesToDfStartTime) {
+ loadCsvfilesToDfStartTime = loadCsvfilesToDfTimePoint;
+ }
+ if (loadCsvfilesToDfTimePoint - loadCsvfilesToDfStartTime > loadCsvfilesToDfCostTime) {
+ loadCsvfilesToDfCostTime = loadCsvfilesToDfTimePoint - loadCsvfilesToDfStartTime;
+ }
+ }
+
+ public double getLruCacheLoadTime() {
+ return lruCacheLoadTime;
+ }
+
+ public void recordDictionaryValuesTotalTime(String partitionID,
+ Long dictionaryValuesTotalTimeTimePoint) {
+ if (null != parDictionaryValuesTotalTimeMap.get(partitionID)) {
+ if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[0]) {
+ parDictionaryValuesTotalTimeMap.get(partitionID)[0] = dictionaryValuesTotalTimeTimePoint;
+ }
+ if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[1] ||
+ dictionaryValuesTotalTimeTimePoint - parDictionaryValuesTotalTimeMap.get(partitionID)[0] >
+ parDictionaryValuesTotalTimeMap.get(partitionID)[1]) {
+ parDictionaryValuesTotalTimeMap.get(partitionID)[1] = dictionaryValuesTotalTimeTimePoint -
+ parDictionaryValuesTotalTimeMap.get(partitionID)[0];
+ }
+ }
+ }
+
+ public void recordCsvInputStepTime(String partitionID,
+ Long csvInputStepTimePoint) {
+ if (null != parCsvInputStepTimeMap.get(partitionID)) {
+ if (null == parCsvInputStepTimeMap.get(partitionID)[0]) {
+ parCsvInputStepTimeMap.get(partitionID)[0] = csvInputStepTimePoint;
+ }
+ if (null == parCsvInputStepTimeMap.get(partitionID)[1] ||
+ csvInputStepTimePoint - parCsvInputStepTimeMap.get(partitionID)[0] >
+ parCsvInputStepTimeMap.get(partitionID)[1]) {
+ parCsvInputStepTimeMap.get(partitionID)[1] = csvInputStepTimePoint -
+ parCsvInputStepTimeMap.get(partitionID)[0];
+ }
+ }
+ }
+
+ public void recordLruCacheLoadTime(double lruCacheLoadTime) {
+ this.lruCacheLoadTime = lruCacheLoadTime;
+ }
+
+ public void recordGeneratingDictionaryValuesTime(String partitionID,
+ Long generatingDictionaryValuesTimePoint) {
+ if (null != parGeneratingDictionaryValuesTimeMap.get(partitionID)) {
+ if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[0]) {
+ parGeneratingDictionaryValuesTimeMap.get(partitionID)[0] =
+ generatingDictionaryValuesTimePoint;
+ }
+ if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] ||
+ generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
+ .get(partitionID)[0] > parGeneratingDictionaryValuesTimeMap
+ .get(partitionID)[1]) {
+ parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] =
+ generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
+ .get(partitionID)[0];
+ }
+ }
+ }
+
+ public void recordSortRowsStepTotalTime(String partitionID,
+ Long sortRowsStepTotalTimePoint) {
+ if (null != parSortRowsStepTotalTimeMap.get(partitionID)) {
+ if (null == parSortRowsStepTotalTimeMap.get(partitionID)[0]) {
+ parSortRowsStepTotalTimeMap.get(partitionID)[0] = sortRowsStepTotalTimePoint;
+ }
+ if (null == parSortRowsStepTotalTimeMap.get(partitionID)[1] ||
+ sortRowsStepTotalTimePoint - parSortRowsStepTotalTimeMap.get(partitionID)[0] >
+ parSortRowsStepTotalTimeMap.get(partitionID)[1]) {
+ parSortRowsStepTotalTimeMap.get(partitionID)[1] = sortRowsStepTotalTimePoint -
+ parSortRowsStepTotalTimeMap.get(partitionID)[0];
+ }
+ }
+ }
+
+ public void recordMdkGenerateTotalTime(String partitionID,
+ Long mdkGenerateTotalTimePoint) {
+ if (null != parMdkGenerateTotalTimeMap.get(partitionID)) {
+ if (null == parMdkGenerateTotalTimeMap.get(partitionID)[0]) {
+ parMdkGenerateTotalTimeMap.get(partitionID)[0] = mdkGenerateTotalTimePoint;
+ }
+ if (null == parMdkGenerateTotalTimeMap.get(partitionID)[1] ||
+ mdkGenerateTotalTimePoint - parMdkGenerateTotalTimeMap.get(partitionID)[0] >
+ parMdkGenerateTotalTimeMap.get(partitionID)[1]) {
+ parMdkGenerateTotalTimeMap.get(partitionID)[1] = mdkGenerateTotalTimePoint -
+ parMdkGenerateTotalTimeMap.get(partitionID)[0];
+ }
+ }
+ }
+
+ public void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
+ Long dictionaryValue2MdkAdd2FileTimePoint) {
+ if (null != parDictionaryValue2MdkAdd2FileTime.get(partitionID)) {
+ if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0]) {
+ parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0] =
+ dictionaryValue2MdkAdd2FileTimePoint;
+ }
+ if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] ||
+ dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
+ .get(partitionID)[0] > parDictionaryValue2MdkAdd2FileTime
+ .get(partitionID)[1]) {
+ parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] =
+ dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
+ .get(partitionID)[0];
+ }
+ }
+ }
+
+ //Record the node blocks information map
+ public void recordHostBlockMap(String host, Integer numBlocks) {
+ hostBlockMap.put(host, numBlocks);
+ }
+
+ //Record the partition blocks information map
+ public void recordPartitionBlockMap(String partitionID, Integer numBlocks) {
+ partitionBlockMap.put(partitionID, numBlocks);
+ }
+
+ public void recordTotalRecords(long totalRecords) {
+ this.totalRecords = totalRecords;
+ }
+
+ //Get the time
+ private double getDicShuffleAndWriteFileTotalTime() {
+ return dicShuffleAndWriteFileTotalCostTime / 1000.0;
+ }
+
+ private double getLoadCsvfilesToDfTime() {
+ return loadCsvfilesToDfCostTime / 1000.0;
+ }
+
+ private double getDictionaryValuesTotalTime(String partitionID) {
+ return parDictionaryValuesTotalTimeMap.get(partitionID)[1] / 1000.0;
+ }
+
+ private double getCsvInputStepTime(String partitionID) {
+ return parCsvInputStepTimeMap.get(partitionID)[1] / 1000.0;
+ }
+
+ private double getGeneratingDictionaryValuesTime(String partitionID) {
+ return parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] / 1000.0;
+ }
+
+ private double getSortRowsStepTotalTime(String partitionID) {
+ return parSortRowsStepTotalTimeMap.get(partitionID)[1] / 1000.0;
+ }
+
+ private double getDictionaryValue2MdkAdd2FileTime(String partitionID) {
+ return parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] / 1000.0;
+ }
+
+ //Get the hostBlockMap
+ private ConcurrentHashMap<String, Integer> getHostBlockMap() {
+ return hostBlockMap;
+ }
+
+ //Get the partitionBlockMap
+ private ConcurrentHashMap<String, Integer> getPartitionBlockMap() {
+ return partitionBlockMap;
+ }
+
+ //Speed calculate
+ private long getTotalRecords() {
+ return this.totalRecords;
+ }
+
+ private int getLoadSpeed() {
+ return (int)(totalRecords / totalTime);
+ }
+
+ private int getGenDicSpeed() {
+ return (int)(totalRecords / getLoadCsvfilesToDfTime() + getDicShuffleAndWriteFileTotalTime());
+ }
+
+ private int getReadCSVSpeed(String partitionID) {
+ return (int)(totalRecords / getCsvInputStepTime(partitionID));
+ }
+
+ private int getGenSurKeySpeed(String partitionID) {
+ return (int)(totalRecords / getGeneratingDictionaryValuesTime(partitionID));
+ }
+
+ private int getSortKeySpeed(String partitionID) {
+ return (int)(totalRecords / getSortRowsStepTotalTime(partitionID));
+ }
+
+ private int getMDKSpeed(String partitionID) {
+ return (int)(totalRecords / getDictionaryValue2MdkAdd2FileTime(partitionID));
+ }
+
+ private double getTotalTime(String partitionID) {
+ this.totalTime = getLoadCsvfilesToDfTime() + getDicShuffleAndWriteFileTotalTime() +
+ getLruCacheLoadTime() + getDictionaryValuesTotalTime(partitionID) +
+ getDictionaryValue2MdkAdd2FileTime(partitionID);
+ return totalTime;
+ }
+
+ //Print the statistics information
+ private void printDicGenStatisticsInfo() {
+ double loadCsvfilesToDfTime = getLoadCsvfilesToDfTime();
+ LOGGER.audit("STAGE 1 ->Load csv to DataFrame and generate" +
+ " block distinct values: " + loadCsvfilesToDfTime + "(s)");
+ double dicShuffleAndWriteFileTotalTime = getDicShuffleAndWriteFileTotalTime();
+ LOGGER.audit("STAGE 2 ->Global dict shuffle and write dict file: " +
+ + dicShuffleAndWriteFileTotalTime + "(s)");
+ }
+
+ private void printLruCacheLoadTimeInfo() {
+ LOGGER.audit("STAGE 3 ->LRU cache load: " + getLruCacheLoadTime() + "(s)");
+ }
+
+ private void printDictionaryValuesGenStatisticsInfo(String partitionID) {
+ double dictionaryValuesTotalTime = getDictionaryValuesTotalTime(partitionID);
+ LOGGER.audit("STAGE 4 ->Total cost of gen dictionary values, sort and write to temp files: "
+ + dictionaryValuesTotalTime + "(s)");
+ double csvInputStepTime = getCsvInputStepTime(partitionID);
+ double generatingDictionaryValuesTime = getGeneratingDictionaryValuesTime(partitionID);
+ LOGGER.audit("STAGE 4.1 -> |_read csv file: " + csvInputStepTime + "(s)");
+ LOGGER.audit("STAGE 4.2 -> |_transform to surrogate key: "
+ + generatingDictionaryValuesTime + "(s)");
+ }
+
+ private void printSortRowsStepStatisticsInfo(String partitionID) {
+ double sortRowsStepTotalTime = getSortRowsStepTotalTime(partitionID);
+ LOGGER.audit("STAGE 4.3 -> |_sort rows and write to temp file: "
+ + sortRowsStepTotalTime + "(s)");
+ }
+
+ private void printGenMdkStatisticsInfo(String partitionID) {
+ double dictionaryValue2MdkAdd2FileTime = getDictionaryValue2MdkAdd2FileTime(partitionID);
+ LOGGER.audit("STAGE 5 ->Transform to MDK, compress and write fact files: "
+ + dictionaryValue2MdkAdd2FileTime + "(s)");
+ }
+
+ //Print the node blocks information
+ private void printHostBlockMapInfo() {
+ LOGGER.audit("========== BLOCK_INFO ==========");
+ if (getHostBlockMap().size() > 0) {
+ for (String host: getHostBlockMap().keySet()) {
+ LOGGER.audit("BLOCK_INFO ->Node host: " + host);
+ LOGGER.audit("BLOCK_INFO ->The block count in this node: " + getHostBlockMap().get(host));
+ }
+ } else if (getPartitionBlockMap().size() > 0) {
+ for (String parID: getPartitionBlockMap().keySet()) {
+ LOGGER.audit("BLOCK_INFO ->Partition ID: " + parID);
+ LOGGER.audit("BLOCK_INFO ->The block count in this partition: " +
+ getPartitionBlockMap().get(parID));
+ }
+ }
+ }
+
+ //Print the speed information
+ private void printLoadSpeedInfo(String partitionID) {
+ LOGGER.audit("===============Load_Speed_Info===============");
+ LOGGER.audit("Total Num of Records Processed: " + getTotalRecords());
+ LOGGER.audit("Total Time Cost: " + getTotalTime(partitionID) + "(s)");
+ LOGGER.audit("Total Load Speed: " + getLoadSpeed() + "records/s");
+ LOGGER.audit("Generate Dictionaries Speed: " + getGenDicSpeed() + "records/s");
+ LOGGER.audit("Read CSV Speed: " + getReadCSVSpeed(partitionID) + " records/s");
+ LOGGER.audit("Generate Surrogate Key Speed: " + getGenSurKeySpeed(partitionID) + " records/s");
+ LOGGER.audit("Sort Key/Write Temp Files Speed: " + getSortKeySpeed(partitionID) + " records/s");
+ LOGGER.audit("MDK Step Speed: " + getMDKSpeed(partitionID) + " records/s");
+ LOGGER.audit("=============================================");
+ }
+
+ public void printStatisticsInfo(String partitionID) {
+ try {
+ LOGGER.audit("========== TIME_STATISTICS PartitionID: " + partitionID + "==========");
+ printDicGenStatisticsInfo();
+ printLruCacheLoadTimeInfo();
+ printDictionaryValuesGenStatisticsInfo(partitionID);
+ printSortRowsStepStatisticsInfo(partitionID);
+ printGenMdkStatisticsInfo(partitionID);
+ printHostBlockMapInfo();
+ printLoadSpeedInfo(partitionID);
+ } catch (Exception e) {
+ LOGGER.audit("Can't print Statistics Information");
+ } finally {
+ resetLoadStatistics();
+ }
+ }
+
+ //Reset the load statistics values
+ private void resetLoadStatistics() {
+ loadCsvfilesToDfStartTime = 0;
+ loadCsvfilesToDfCostTime = 0;
+ dicShuffleAndWriteFileTotalStartTime = 0;
+ dicShuffleAndWriteFileTotalCostTime = 0;
+ lruCacheLoadTime = 0;
+ totalRecords = 0;
+ totalTime = 0;
+ parDictionaryValuesTotalTimeMap.clear();
+ parCsvInputStepTimeMap.clear();
+ parSortRowsStepTotalTimeMap.clear();
+ parGeneratingDictionaryValuesTimeMap.clear();
+ parMdkGenerateTotalTimeMap.clear();
+ parDictionaryValue2MdkAdd2FileTime.clear();
+ }
+
+}