You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:19 UTC
[34/52] [partial] incubator-carbondata git commit: Renamed packages
to org.apache.carbondata and fixed errors
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/LoadStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/LoadStatistics.java b/core/src/main/java/org/apache/carbondata/core/util/LoadStatistics.java
new file mode 100644
index 0000000..9fb1e6e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/LoadStatistics.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+public interface LoadStatistics {
+ //Init PartitonInfo
+ void initPartitonInfo(String PartitionId);
+
+ //Record the time
+ void recordDicShuffleAndWriteTime();
+
+ void recordLoadCsvfilesToDfTime();
+
+ void recordDictionaryValuesTotalTime(String partitionID,
+ Long dictionaryValuesTotalTimeTimePoint);
+
+ void recordCsvInputStepTime(String partitionID,
+ Long csvInputStepTimePoint);
+
+ void recordLruCacheLoadTime(double lruCacheLoadTime);
+
+ void recordGeneratingDictionaryValuesTime(String partitionID,
+ Long generatingDictionaryValuesTimePoint);
+
+ void recordSortRowsStepTotalTime(String partitionID,
+ Long sortRowsStepTotalTimePoint);
+
+ void recordMdkGenerateTotalTime(String partitionID,
+ Long mdkGenerateTotalTimePoint);
+
+ void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
+ Long dictionaryValue2MdkAdd2FileTimePoint);
+
+ //Record the node blocks information map
+ void recordHostBlockMap(String host, Integer numBlocks);
+
+ //Record the partition blocks information map
+ void recordPartitionBlockMap(String partitionID, Integer numBlocks);
+
+ //Record total num of records processed
+ void recordTotalRecords(long totalRecords);
+
+ //Print the statistics information
+ void printStatisticsInfo(String partitionID);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
new file mode 100644
index 0000000..4a229d6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
@@ -0,0 +1,1027 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.store.compression.MeasureMetaDataModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressByteArray;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressDefaultLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinByte;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinDefault;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinFloat;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinInt;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinShort;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalByte;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalDefault;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalFloat;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalInt;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinByte;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinDefault;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinFloat;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinInt;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinShort;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalShort;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneByte;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneDefault;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneFloat;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneInt;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneShort;
+
+public final class ValueCompressionUtil {
+
+ /**
+ * Attribute for Carbon LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(ValueCompressionUtil.class.getName());
+
+ private ValueCompressionUtil() {
+
+ }
+
+ /**
+ * decide actual type of value
+ *
+ * @param value :the measure value
+ * @param decimal :
+ * @return: actual type of value
+ * @see
+ */
+ private static DataType getDataType(double value, int decimal, byte dataTypeSelected) {
+ DataType dataType = DataType.DATA_DOUBLE;
+ if (decimal == 0) {
+ if (value < Byte.MAX_VALUE) {
+ dataType = DataType.DATA_BYTE;
+ } else if (value < Short.MAX_VALUE) {
+ dataType = DataType.DATA_SHORT;
+ } else if (value < Integer.MAX_VALUE) {
+ dataType = DataType.DATA_INT;
+ } else if (value < Long.MAX_VALUE) {
+ dataType = DataType.DATA_LONG;
+ }
+ } else {
+ if (dataTypeSelected == 1) {
+ if (value < Float.MAX_VALUE) {
+ float floatValue = (float) value;
+ if (floatValue - value != 0) {
+ dataType = DataType.DATA_DOUBLE;
+
+ } else {
+ dataType = DataType.DATA_FLOAT;
+ }
+ } else if (value < Double.MAX_VALUE) {
+ dataType = DataType.DATA_DOUBLE;
+ }
+ }
+ }
+ return dataType;
+ }
+
+ /**
+ * Gives the size of datatype
+ *
+ * @param dataType : measure value type
+ * @return: the size of DataType
+ * @see
+ */
+ public static int getSize(DataType dataType) {
+
+ switch (dataType) {
+ case DATA_BYTE:
+ return 1;
+ case DATA_SHORT:
+ return 2;
+ case DATA_INT:
+ case DATA_FLOAT:
+ return 4;
+ default:
+ return 8;
+ }
+ }
+
+ /**
+ * get the best compression type. priority list,from high to low:
+ * COMPRESSION_TYPE.NONE COMPRESSION_TYPE.MAX_MIN
+ * COMPRESSION_TYPE.NON_DECIMAL_CONVERT COMPRESSION_TYPE.MAX_MIN_NDC
+ *
+ * @param maxValue : max value of one measure
+ * @param minValue : min value of one measure
+ * @param decimal : decimal num of one measure
+ * @return : the best compression type
+ * @see
+ */
+ private static CompressionFinder getCompressionType(Object maxValue, Object minValue, int decimal,
+ char aggregatorType, byte dataTypeSelected) {
+ // 'c' for aggregate table,'b' fo rBigdecimal, 'l' for long,'n' for double
+ switch (aggregatorType) {
+ case 'c':
+ return new CompressionFinder(COMPRESSION_TYPE.CUSTOM, DataType.DATA_BYTE,
+ DataType.DATA_BYTE);
+ case 'b':
+ return new CompressionFinder(COMPRESSION_TYPE.CUSTOM_BIGDECIMAL, DataType.DATA_BYTE,
+ DataType.DATA_BYTE);
+ case 'l':
+ return new CompressionFinder(COMPRESSION_TYPE.NONE,
+ DataType.DATA_BIGINT, DataType.DATA_BIGINT);
+ default:
+ break;
+ }
+ // None Decimal
+ if (decimal == 0) {
+ if (getSize(getDataType((double) maxValue, decimal, dataTypeSelected)) > getSize(
+ getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected))) {
+ return new CompressionFinder(COMPRESSION_TYPE.MAX_MIN, DataType.DATA_DOUBLE,
+ getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected));
+ } else if (getSize(getDataType((double) maxValue, decimal, dataTypeSelected)) < getSize(
+ getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected))) {
+ return new CompressionFinder(COMPRESSION_TYPE.NONE, DataType.DATA_DOUBLE,
+ getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected));
+ } else {
+ return new CompressionFinder(COMPRESSION_TYPE.NONE, DataType.DATA_DOUBLE,
+ getDataType((double) maxValue, decimal, dataTypeSelected));
+ }
+ }
+ // decimal
+ else {
+ DataType actualDataType = getDataType((double) maxValue, decimal, dataTypeSelected);
+ DataType diffDataType =
+ getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected);
+ DataType maxNonDecDataType =
+ getDataType(Math.pow(10, decimal) * (double) maxValue, 0, dataTypeSelected);
+ DataType diffNonDecDataType =
+ getDataType(Math.pow(10, decimal) * ((double) maxValue - (double) minValue), 0,
+ dataTypeSelected);
+
+ CompressionFinder[] finders = new CompressionFinder[] {
+ new CompressionFinder(actualDataType, actualDataType, CompressionFinder.PRIORITY.ACTUAL,
+ COMPRESSION_TYPE.NONE),
+ new CompressionFinder(actualDataType, diffDataType, CompressionFinder.PRIORITY.DIFFSIZE,
+ COMPRESSION_TYPE.MAX_MIN), new CompressionFinder(actualDataType, maxNonDecDataType,
+ CompressionFinder.PRIORITY.MAXNONDECIMAL, COMPRESSION_TYPE.NON_DECIMAL_CONVERT),
+ new CompressionFinder(actualDataType, diffNonDecDataType,
+ CompressionFinder.PRIORITY.DIFFNONDECIMAL, COMPRESSION_TYPE.MAX_MIN_NDC) };
+ // sort the compressionFinder.The top have the highest priority
+ Arrays.sort(finders);
+ CompressionFinder compression = finders[0];
+ return compression;
+ }
+ }
+
+ /**
+ * @param compType : compression type
+ * @param values : the data of one measure
+ * @param changedDataType : changed data type
+ * @param maxValue : the max value of one measure
+ * @param decimal : the decimal length of one measure
+ * @return: the compress data array
+ * @see
+ */
+ public static Object getCompressedValues(COMPRESSION_TYPE compType, double[] values,
+ DataType changedDataType, double maxValue, int decimal) {
+ Object o;
+ switch (compType) {
+ case NONE:
+
+ o = compressNone(changedDataType, values);
+ return o;
+
+ case MAX_MIN:
+
+ o = compressMaxMin(changedDataType, values, maxValue);
+ return o;
+
+ case NON_DECIMAL_CONVERT:
+
+ o = compressNonDecimal(changedDataType, values, decimal);
+ return o;
+
+ default:
+ o = compressNonDecimalMaxMin(changedDataType, values, decimal, maxValue);
+ return o;
+ }
+ }
+
+ public static Object getCompressedValues(COMPRESSION_TYPE compType, long[] values,
+ DataType changedDataType, long maxValue, int decimal) {
+ Object o;
+ switch (compType) {
+ case NONE:
+ default:
+ return values;
+ }
+ }
+
+ private static ValueCompressonHolder.UnCompressValue[] getUncompressedValues(
+ COMPRESSION_TYPE[] compType, DataType[] actualDataType, DataType[] changedDataType) {
+
+ ValueCompressonHolder.UnCompressValue[] compressValue =
+ new ValueCompressonHolder.UnCompressValue[changedDataType.length];
+ for (int i = 0; i < changedDataType.length; i++) {
+ switch (compType[i]) {
+ case NONE:
+
+ compressValue[i] = unCompressNone(changedDataType[i], actualDataType[i]);
+ break;
+
+ case MAX_MIN:
+
+ compressValue[i] = unCompressMaxMin(changedDataType[i], actualDataType[i]);
+ break;
+
+ case NON_DECIMAL_CONVERT:
+
+ compressValue[i] = unCompressNonDecimal(changedDataType[i], DataType.DATA_DOUBLE);
+ break;
+
+ case CUSTOM:
+ compressValue[i] = new UnCompressByteArray(UnCompressByteArray.ByteArrayType.BYTE_ARRAY);
+ break;
+
+ case CUSTOM_BIGDECIMAL:
+ compressValue[i] = new UnCompressByteArray(UnCompressByteArray.ByteArrayType.BIG_DECIMAL);
+ break;
+
+ default:
+ compressValue[i] = unCompressNonDecimalMaxMin(changedDataType[i], null);
+ }
+ }
+ return compressValue;
+
+ }
+
+ /**
+ * compress data to other type for example: double -> int
+ */
+ private static Object compressNone(DataType changedDataType, double[] value) {
+ int i = 0;
+ switch (changedDataType) {
+
+ case DATA_BYTE:
+
+ byte[] result = new byte[value.length];
+
+ for (double a : value) {
+ result[i] = (byte) a;
+ i++;
+ }
+ return result;
+
+ case DATA_SHORT:
+
+ short[] shortResult = new short[value.length];
+
+ for (double a : value) {
+ shortResult[i] = (short) a;
+ i++;
+ }
+ return shortResult;
+
+ case DATA_INT:
+
+ int[] intResult = new int[value.length];
+
+ for (double a : value) {
+ intResult[i] = (int) a;
+ i++;
+ }
+ return intResult;
+
+ case DATA_LONG:
+ case DATA_BIGINT:
+
+ long[] longResult = new long[value.length];
+
+ for (double a : value) {
+ longResult[i] = (long) a;
+ i++;
+ }
+ return longResult;
+
+ case DATA_FLOAT:
+
+ float[] floatResult = new float[value.length];
+
+ for (double a : value) {
+ floatResult[i] = (float) a;
+ i++;
+ }
+ return floatResult;
+
+ default:
+
+ return value;
+
+ }
+ }
+
+ /**
+ * compress data to other type through sub value for example: 1. subValue =
+ * maxValue - value 2. subValue: double->int
+ */
+ private static Object compressMaxMin(DataType changedDataType, double[] value, double maxValue) {
+ int i = 0;
+ switch (changedDataType) {
+ case DATA_BYTE:
+
+ byte[] result = new byte[value.length];
+ for (double a : value) {
+ result[i] = (byte) (maxValue - a);
+ i++;
+ }
+ return result;
+
+ case DATA_SHORT:
+
+ short[] shortResult = new short[value.length];
+
+ for (double a : value) {
+ shortResult[i] = (short) (maxValue - a);
+ i++;
+ }
+ return shortResult;
+
+ case DATA_INT:
+
+ int[] intResult = new int[value.length];
+
+ for (double a : value) {
+ intResult[i] = (int) (maxValue - a);
+ i++;
+ }
+ return intResult;
+
+ case DATA_LONG:
+
+ long[] longResult = new long[value.length];
+
+ for (double a : value) {
+ longResult[i] = (long) (maxValue - a);
+ i++;
+ }
+ return longResult;
+
+ case DATA_FLOAT:
+
+ float[] floatResult = new float[value.length];
+
+ for (double a : value) {
+ floatResult[i] = (float) (maxValue - a);
+ i++;
+ }
+ return floatResult;
+
+ default:
+
+ double[] defaultResult = new double[value.length];
+
+ for (double a : value) {
+ defaultResult[i] = (double) (maxValue - a);
+ i++;
+ }
+ return defaultResult;
+
+ }
+ }
+
+ /**
+ * compress data to other type through sub value for example: 1. subValue =
+ * value * Math.pow(10, decimal) 2. subValue: double->int
+ */
+ private static Object compressNonDecimal(DataType changedDataType, double[] value, int decimal) {
+ int i = 0;
+ switch (changedDataType) {
+ case DATA_BYTE:
+ byte[] result = new byte[value.length];
+
+ for (double a : value) {
+ result[i] = (byte) (Math.round(Math.pow(10, decimal) * a));
+ i++;
+ }
+ return result;
+ case DATA_SHORT:
+ short[] shortResult = new short[value.length];
+
+ for (double a : value) {
+ shortResult[i] = (short) (Math.round(Math.pow(10, decimal) * a));
+ i++;
+ }
+ return shortResult;
+ case DATA_INT:
+
+ int[] intResult = new int[value.length];
+
+ for (double a : value) {
+ intResult[i] = (int) (Math.round(Math.pow(10, decimal) * a));
+ i++;
+ }
+ return intResult;
+
+ case DATA_LONG:
+
+ long[] longResult = new long[value.length];
+
+ for (double a : value) {
+ longResult[i] = (long) (Math.round(Math.pow(10, decimal) * a));
+ i++;
+ }
+ return longResult;
+
+ case DATA_FLOAT:
+
+ float[] floatResult = new float[value.length];
+
+ for (double a : value) {
+ floatResult[i] = (float) (Math.round(Math.pow(10, decimal) * a));
+ i++;
+ }
+ return floatResult;
+
+ default:
+ double[] defaultResult = new double[value.length];
+
+ for (double a : value) {
+ defaultResult[i] = (double) (Math.round(Math.pow(10, decimal) * a));
+ i++;
+ }
+ return defaultResult;
+ }
+ }
+
+ /**
+ * compress data to other type through sub value for example: 1. subValue =
+ * maxValue - value 2. subValue = subValue * Math.pow(10, decimal) 3.
+ * subValue: double->int
+ */
+ private static Object compressNonDecimalMaxMin(DataType changedDataType, double[] value,
+ int decimal, double maxValue) {
+ int i = 0;
+ switch (changedDataType) {
+ case DATA_BYTE:
+
+ byte[] result = new byte[value.length];
+
+ for (double a : value) {
+ result[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+ i++;
+ }
+ return result;
+
+ case DATA_SHORT:
+
+ short[] shortResult = new short[value.length];
+
+ for (double a : value) {
+ shortResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+ i++;
+ }
+ return shortResult;
+
+ case DATA_INT:
+
+ int[] intResult = new int[value.length];
+
+ for (double a : value) {
+ intResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+ i++;
+ }
+ return intResult;
+
+ case DATA_LONG:
+
+ long[] longResult = new long[value.length];
+
+ for (double a : value) {
+ longResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+ i++;
+ }
+ return longResult;
+
+ case DATA_FLOAT:
+
+ float[] floatResult = new float[value.length];
+
+ for (double a : value) {
+ floatResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+ i++;
+ }
+ return floatResult;
+
+ default:
+
+ double[] defaultResult = new double[value.length];
+
+ for (double a : value) {
+ defaultResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+ i++;
+ }
+ return defaultResult;
+
+ }
+ }
+
+ /**
+ * uncompress data for example: int -> double
+ */
+ public static ValueCompressonHolder.UnCompressValue unCompressNone(DataType compDataType,
+ DataType actualDataType) {
+ if (actualDataType == DataType.DATA_BIGINT) {
+ return new UnCompressDefaultLong();
+ } else {
+ switch (compDataType) {
+ case DATA_BYTE:
+
+ return new UnCompressNoneByte();
+
+ case DATA_SHORT:
+
+ return new UnCompressNoneShort();
+
+ case DATA_INT:
+
+ return new UnCompressNoneInt();
+
+ case DATA_LONG:
+
+ return new UnCompressNoneLong();
+
+ case DATA_FLOAT:
+
+ return new UnCompressNoneFloat();
+
+ default:
+
+ return new UnCompressNoneDefault();
+ }
+ }
+ }
+
+ /**
+ * uncompress data 1. value = maxValue - subValue 2. value: int->double
+ */
+ public static ValueCompressonHolder.UnCompressValue unCompressMaxMin(DataType compDataType,
+ DataType actualDataType) {
+ switch (compDataType) {
+ case DATA_BYTE:
+
+ return new UnCompressMaxMinByte();
+
+ case DATA_SHORT:
+
+ return new UnCompressMaxMinShort();
+
+ case DATA_INT:
+
+ return new UnCompressMaxMinInt();
+
+ case DATA_LONG:
+
+ return new UnCompressMaxMinLong();
+
+ case DATA_FLOAT:
+
+ return new UnCompressMaxMinFloat();
+
+ default:
+
+ return new UnCompressMaxMinDefault();
+
+ }
+ }
+
+ /**
+ * uncompress data value = value/Math.pow(10, decimal)
+ */
+ public static ValueCompressonHolder.UnCompressValue unCompressNonDecimal(DataType compDataType,
+ DataType actualDataType) {
+ switch (compDataType) {
+ case DATA_BYTE:
+
+ return new UnCompressNonDecimalByte();
+
+ case DATA_SHORT:
+
+ return new UnCompressNonDecimalShort();
+
+ case DATA_INT:
+
+ return new UnCompressNonDecimalInt();
+
+ case DATA_LONG:
+
+ return new UnCompressNonDecimalLong();
+
+ case DATA_FLOAT:
+
+ return new UnCompressNonDecimalFloat();
+
+ default:
+
+ return new UnCompressNonDecimalDefault();
+
+ }
+ }
+
+ /**
+ * uncompress data value = (maxValue - subValue)/Math.pow(10, decimal)
+ */
+ public static ValueCompressonHolder.UnCompressValue unCompressNonDecimalMaxMin(
+ DataType compDataType, DataType actualDataType) {
+ switch (compDataType) {
+ case DATA_BYTE:
+
+ return new UnCompressNonDecimalMaxMinByte();
+
+ case DATA_SHORT:
+
+ return new UnCompressNonDecimalMaxMinShort();
+
+ case DATA_INT:
+
+ return new UnCompressNonDecimalMaxMinInt();
+
+ case DATA_LONG:
+
+ return new UnCompressNonDecimalMaxMinLong();
+
+ case DATA_FLOAT:
+
+ return new UnCompressNonDecimalMaxMinFloat();
+
+ default:
+
+ return new UnCompressNonDecimalMaxMinDefault();
+
+ }
+ }
+
+ /**
+ * Create Value compression model
+ *
+ * @param maxValue
+ * @param minValue
+ * @param decimalLength
+ * @param uniqueValue
+ * @param aggType
+ * @param dataTypeSelected
+ * @return
+ */
+ public static ValueCompressionModel getValueCompressionModel(Object[] maxValue, Object[] minValue,
+ int[] decimalLength, Object[] uniqueValue, char[] aggType, byte[] dataTypeSelected) {
+
+ MeasureMetaDataModel metaDataModel =
+ new MeasureMetaDataModel(minValue, maxValue, decimalLength, maxValue.length, uniqueValue,
+ aggType, dataTypeSelected);
+ return getValueCompressionModel(metaDataModel);
+ }
+
+ public static ValueCompressionModel getValueCompressionModel(MeasureMetaDataModel measureMDMdl) {
+ int measureCount = measureMDMdl.getMeasureCount();
+ Object[] minValue = measureMDMdl.getMinValue();
+ Object[] maxValue = measureMDMdl.getMaxValue();
+ Object[] uniqueValue = measureMDMdl.getUniqueValue();
+ int[] decimal = measureMDMdl.getDecimal();
+ char[] type = measureMDMdl.getType();
+ byte[] dataTypeSelected = measureMDMdl.getDataTypeSelected();
+ ValueCompressionModel compressionModel = new ValueCompressionModel();
+ DataType[] actualType = new DataType[measureCount];
+ DataType[] changedType = new DataType[measureCount];
+ COMPRESSION_TYPE[] compType = new COMPRESSION_TYPE[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ CompressionFinder compresssionFinder = ValueCompressionUtil
+ .getCompressionType(maxValue[i], minValue[i], decimal[i], type[i], dataTypeSelected[i]);
+ actualType[i] = compresssionFinder.actualDataType;
+ changedType[i] = compresssionFinder.changedDataType;
+ compType[i] = compresssionFinder.compType;
+ }
+ compressionModel.setMaxValue(maxValue);
+ compressionModel.setDecimal(decimal);
+ compressionModel.setChangedDataType(changedType);
+ compressionModel.setCompType(compType);
+ compressionModel.setActualDataType(actualType);
+ compressionModel.setMinValue(minValue);
+ compressionModel.setUniqueValue(uniqueValue);
+ compressionModel.setType(type);
+ compressionModel.setMinValueFactForAgg(measureMDMdl.getMinValueFactForAgg());
+ compressionModel.setDataTypeSelected(dataTypeSelected);
+ ValueCompressonHolder.UnCompressValue[] values = ValueCompressionUtil
+ .getUncompressedValues(compressionModel.getCompType(), compressionModel.getActualDataType(),
+ compressionModel.getChangedDataType());
+ compressionModel.setUnCompressValues(values);
+ return compressionModel;
+ }
+
+ public static byte[] convertToBytes(short[] values) {
+ ByteBuffer buffer = ByteBuffer.allocate(values.length * 2);
+ for (short val : values) {
+ buffer.putShort(val);
+ }
+ return buffer.array();
+ }
+
+ public static byte[] convertToBytes(int[] values) {
+ ByteBuffer buffer = ByteBuffer.allocate(values.length * 4);
+ for (int val : values) {
+ buffer.putInt(val);
+ }
+ return buffer.array();
+ }
+
+ public static byte[] convertToBytes(float[] values) {
+ ByteBuffer buffer = ByteBuffer.allocate(values.length * 4);
+ for (float val : values) {
+ buffer.putFloat(val);
+ }
+ return buffer.array();
+ }
+
+ public static byte[] convertToBytes(long[] values) {
+ ByteBuffer buffer = ByteBuffer.allocate(values.length * 8);
+ for (long val : values) {
+ buffer.putLong(val);
+ }
+ return buffer.array();
+ }
+
+ public static byte[] convertToBytes(double[] values) {
+ ByteBuffer buffer = ByteBuffer.allocate(values.length * 8);
+ for (double val : values) {
+ buffer.putDouble(val);
+ }
+ return buffer.array();
+ }
+
+ public static short[] convertToShortArray(ByteBuffer buffer, int length) {
+ buffer.rewind();
+ short[] values = new short[length / 2];
+
+ for (int i = 0; i < values.length; i++) {
+ values[i] = buffer.getShort();
+ }
+ return values;
+ }
+
+ public static int[] convertToIntArray(ByteBuffer buffer, int length) {
+ buffer.rewind();
+ int[] values = new int[length / 4];
+
+ for (int i = 0; i < values.length; i++) {
+ values[i] = buffer.getInt();
+ }
+ return values;
+ }
+
+ public static float[] convertToFloatArray(ByteBuffer buffer, int length) {
+ buffer.rewind();
+ float[] values = new float[length / 4];
+
+ for (int i = 0; i < values.length; i++) {
+ values[i] = buffer.getFloat();
+ }
+ return values;
+ }
+
+ public static long[] convertToLongArray(ByteBuffer buffer, int length) {
+ buffer.rewind();
+ long[] values = new long[length / 8];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = buffer.getLong();
+ }
+ return values;
+ }
+
+ public static double[] convertToDoubleArray(ByteBuffer buffer, int length) {
+ buffer.rewind();
+ double[] values = new double[length / 8];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = buffer.getDouble();
+ }
+ return values;
+ }
+
+ /**
+ * use to identify compression type.
+ */
+ public static enum COMPRESSION_TYPE {
+ /**
+ *
+ */
+ NONE, /**
+ *
+ */
+ MAX_MIN, /**
+ *
+ */
+ NON_DECIMAL_CONVERT, /**
+ *
+ */
+ MAX_MIN_NDC,
+
+ /**
+ * custome
+ */
+ CUSTOM,
+
+ CUSTOM_BIGDECIMAL
+ }
+
+ /**
+ * use to identify the type of data.
+ */
+ public static enum DataType {
+ /**
+ *
+ */
+ DATA_BYTE(), /**
+ *
+ */
+ DATA_SHORT(), /**
+ *
+ */
+ DATA_INT(), /**
+ *
+ */
+ DATA_FLOAT(), /**
+ *
+ */
+ DATA_LONG(), /**
+ *
+ */
+ DATA_BIGINT(), /**
+ *
+ */
+ DATA_DOUBLE();
+
+ /**
+ * DataType.
+ */
+ private DataType() {
+ //this.size = size;
+ }
+
+ }
+
+ /**
+ * through the size of data type,priority and compression type, select the
+ * best compression type
+ */
+ private static class CompressionFinder implements Comparable<CompressionFinder> {
+ /**
+ * compType.
+ */
+ private COMPRESSION_TYPE compType;
+ /**
+ * actualDataType.
+ */
+ private DataType actualDataType;
+ /**
+ * changedDataType.
+ */
+ private DataType changedDataType;
+ /**
+ * the size of changed data
+ */
+ private int size;
+ /**
+ * priority.
+ */
+ private PRIORITY priority;
+
+ /**
+ * CompressionFinder constructor.
+ *
+ * @param compType
+ * @param actualDataType
+ * @param changedDataType
+ */
+ CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType,
+ DataType changedDataType) {
+ super();
+ this.compType = compType;
+ this.actualDataType = actualDataType;
+ this.changedDataType = changedDataType;
+ }
+
+ /**
+ * CompressionFinder overloaded constructor.
+ *
+ * @param actualDataType
+ * @param changedDataType
+ * @param priority
+ * @param compType
+ */
+
+ CompressionFinder(DataType actualDataType, DataType changedDataType, PRIORITY priority,
+ COMPRESSION_TYPE compType) {
+ super();
+ this.actualDataType = actualDataType;
+ this.changedDataType = changedDataType;
+ this.size = getSize(changedDataType);
+ this.priority = priority;
+ this.compType = compType;
+ }
+
+ @Override public boolean equals(Object obj) {
+ boolean equals = false;
+ if (obj instanceof CompressionFinder) {
+ CompressionFinder cf = (CompressionFinder) obj;
+
+ if (this.size == cf.size && this.priority == cf.priority) {
+ equals = true;
+ }
+
+ }
+ return equals;
+ }
+
+ @Override public int hashCode() {
+ final int code = 31;
+ int result = 1;
+
+ result = code * result + this.size;
+ result = code * result + ((priority == null) ? 0 : priority.hashCode());
+ return result;
+ }
+
+ @Override public int compareTo(CompressionFinder o) {
+ int returnVal = 0;
+ // the big size have high priority
+ if (this.equals(o)) {
+ returnVal = 0;
+ } else if (this.size == o.size) {
+ // the compression type priority
+ if (priority.priority > o.priority.priority) {
+ returnVal = 1;
+ } else if (priority.priority < o.priority.priority) {
+ returnVal = -1;
+ }
+
+ } else if (this.size > o.size) {
+ returnVal = 1;
+ } else {
+ returnVal = -1;
+ }
+ return returnVal;
+ }
+
+ /**
+ * Compression type priority.
+ * ACTUAL is the highest priority and DIFFNONDECIMAL is the lowest
+ * priority
+ */
+ static enum PRIORITY {
+ /**
+ *
+ */
+ ACTUAL(0), /**
+ *
+ */
+ DIFFSIZE(1), /**
+ *
+ */
+ MAXNONDECIMAL(2), /**
+ *
+ */
+ DIFFNONDECIMAL(3);
+
+ /**
+ * priority.
+ */
+ private int priority;
+
+ private PRIORITY(int priority) {
+ this.priority = priority;
+ }
+ }
+ }
+
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/ByteArrayHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ByteArrayHolder.java b/core/src/main/java/org/apache/carbondata/core/writer/ByteArrayHolder.java
new file mode 100644
index 0000000..73bbf11
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/ByteArrayHolder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class ByteArrayHolder implements Comparable<ByteArrayHolder> {
+
+ /**
+ * mdkey
+ */
+ private byte[] mdKey;
+
+ /**
+ * primary key
+ */
+ private int primaryKey;
+
+ /**
+ * @param mdKey
+ * @param primaryKey
+ */
+ public ByteArrayHolder(byte[] mdKey, int primaryKey) {
+ this.mdKey = mdKey;
+ this.primaryKey = primaryKey;
+ }
+
+ @Override public int compareTo(ByteArrayHolder o) {
+ return ByteUtil.compare(mdKey, o.mdKey);
+ }
+
+ @Override public boolean equals(Object obj) {
+ // TODO Auto-generated method stub
+ if (obj instanceof ByteArrayHolder) {
+ if (0 == ByteUtil.compare(mdKey, ((ByteArrayHolder) obj).mdKey)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override public int hashCode() {
+ int prime = 31;
+ int result = prime * Arrays.hashCode(mdKey);
+ result = result + prime * primaryKey;
+ return result;
+ }
+
+ public byte[] getMdKey() {
+ return mdKey;
+ }
+
+ public int getPrimaryKey() {
+ return primaryKey;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriter.java
new file mode 100644
index 0000000..24ea06e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * dictionary writer interface
+ */
+public interface CarbonDictionaryWriter extends Closeable {
+ /**
+ * write method that accepts one value at a time
+ * This method can be used when data is huge and memory is les. In that
+ * case data can be stored to a file and an iterator can iterate over it and
+ * pass one value at a time
+ *
+ * @param value unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ void write(String value) throws IOException;
+
+ /**
+ * write method that accepts one value at a time
+ * This method can be used when data is huge and memory is les. In that
+ * case data can be stored to a file and an iterator can iterate over it and
+ * pass one value at a time
+ *
+ * @param value unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ void write(byte[] value) throws IOException;
+
+ /**
+ * write method that accepts list of byte arrays as value
+ * This can be used when data is less, then string can be converted
+ * to byte array for each value and added to a list
+ *
+ * @param valueList list of byte array. Each byte array is unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ void write(List<byte[]> valueList) throws IOException;
+
+
+ void commit() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
new file mode 100644
index 0000000..2e08610
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.ColumnDictionaryChunk;
+import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
+
+import org.apache.thrift.TBase;
+
+/**
+ * This class is responsible for writing the dictionary file and its metadata
+ */
+public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter {
+
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonDictionaryWriterImpl.class.getName());
+
+ /**
+ * carbon type identifier
+ */
+ protected CarbonTableIdentifier carbonTableIdentifier;
+
+ /**
+ * list which will hold values upto maximum of one dictionary chunk size
+ */
+ private List<ByteBuffer> oneDictionaryChunkList;
+
+ /**
+ * Meta object which will hold last segment entry details
+ */
+ private CarbonDictionaryColumnMetaChunk chunkMetaObjectForLastSegmentEntry;
+
+ /**
+ * dictionary file and meta thrift writer
+ */
+ private ThriftWriter dictionaryThriftWriter;
+
+ /**
+ * column identifier
+ */
+ protected ColumnIdentifier columnIdentifier;
+
+ /**
+ * HDFS store path
+ */
+ protected String hdfsStorePath;
+
+ /**
+ * dictionary file path
+ */
+ protected String dictionaryFilePath;
+
+ /**
+ * dictionary metadata file path
+ */
+ protected String dictionaryMetaFilePath;
+
+ /**
+ * start offset of dictionary chunk for a segment
+ */
+ private long chunk_start_offset;
+
+ /**
+ * end offset of a dictionary chunk for a segment
+ */
+ private long chunk_end_offset;
+
+ /**
+ * total dictionary value record count for one segment
+ */
+ private int totalRecordCount;
+
+ /**
+ * total thrift object chunk count written for one segment
+ */
+ private int chunk_count;
+
+ /**
+ * chunk size for a dictionary file after which data will be written to disk
+ */
+ private int dictionary_one_chunk_size;
+
+ /**
+ * flag to check whether write method is called for first time
+ */
+ private boolean isFirstTime;
+
+ private static final Charset defaultCharset = Charset.forName(
+ CarbonCommonConstants.DEFAULT_CHARSET);
+
+ /**
+ * Constructor
+ *
+ * @param hdfsStorePath HDFS store path
+ * @param carbonTableIdentifier table identifier which will give table name and database name
+ * @param columnIdentifier column unique identifier
+ */
+ public CarbonDictionaryWriterImpl(String hdfsStorePath,
+ CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier) {
+ this.carbonTableIdentifier = carbonTableIdentifier;
+ this.columnIdentifier = columnIdentifier;
+ this.hdfsStorePath = hdfsStorePath;
+ this.isFirstTime = true;
+ }
+
+ /**
+ * This method will write the data in thrift format to disk. This method will be guided by
+ * parameter dictionary_one_chunk_size and data will be divided into chunks
+ * based on this parameter
+ *
+ * @param value unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public void write(String value) throws IOException {
+ write(value.getBytes(defaultCharset));
+ }
+
+ /**
+ * This method will write the data in thrift format to disk. This method will be guided by
+ * parameter dictionary_one_chunk_size and data will be divided into chunks
+ * based on this parameter
+ *
+ * @param value unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public void write(byte[] value) throws IOException {
+ if (isFirstTime) {
+ init();
+ isFirstTime = false;
+ }
+ // if one chunk size is equal to list size then write the data to file
+ checkAndWriteDictionaryChunkToFile();
+ oneDictionaryChunkList.add(ByteBuffer.wrap(value));
+ totalRecordCount++;
+ }
+
+ /**
+ * This method will write the data in thrift format to disk. This method will not be guided by
+ * parameter dictionary_one_chunk_size and complete data will be written as one chunk
+ *
+ * @param valueList list of byte array. Each byte array is unique dictionary value
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public void write(List<byte[]> valueList) throws IOException {
+ if (isFirstTime) {
+ init();
+ isFirstTime = false;
+ }
+ for (byte[] value : valueList) {
+ oneDictionaryChunkList.add(ByteBuffer.wrap(value));
+ totalRecordCount++;
+ }
+ }
+
+ /**
+ * write dictionary metadata file and close thrift object
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public void close() throws IOException {
+ if (null != dictionaryThriftWriter) {
+ writeDictionaryFile();
+ // close the thrift writer for dictionary file
+ closeThriftWriter();
+ }
+ }
+
+ /**
+ * check if the threshold has been reached for the number of
+ * values that can kept in memory and then flush the data to file
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ private void checkAndWriteDictionaryChunkToFile() throws IOException {
+ if (oneDictionaryChunkList.size() >= dictionary_one_chunk_size) {
+ writeDictionaryFile();
+ createChunkList();
+ }
+ }
+
+ /**
+ * This method will serialize the object of dictionary file
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ private void writeDictionaryFile() throws IOException {
+ ColumnDictionaryChunk columnDictionaryChunk = new ColumnDictionaryChunk();
+ columnDictionaryChunk.setValues(oneDictionaryChunkList);
+ writeThriftObject(columnDictionaryChunk);
+ }
+
+ /**
+ * This method will check and created the directory path where dictionary file has to be created
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ private void init() throws IOException {
+ initDictionaryChunkSize();
+ initPaths();
+ boolean dictFileExists = CarbonUtil.isFileExists(this.dictionaryFilePath);
+ if (dictFileExists && CarbonUtil.isFileExists(this.dictionaryMetaFilePath)) {
+ this.chunk_start_offset = CarbonUtil.getFileSize(this.dictionaryFilePath);
+ validateDictionaryFileOffsetWithLastSegmentEntryOffset();
+ } else if (dictFileExists) {
+ FileFactory.getCarbonFile(dictionaryFilePath, FileFactory.getFileType(dictionaryFilePath))
+ .delete();
+ }
+ openThriftWriter(this.dictionaryFilePath);
+ createChunkList();
+ }
+
+ protected void initPaths() {
+ PathService pathService = CarbonCommonFactory.getPathService();
+ CarbonTablePath carbonTablePath = pathService.getCarbonTablePath(columnIdentifier,
+ this.hdfsStorePath, carbonTableIdentifier);
+ this.dictionaryFilePath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
+ this.dictionaryMetaFilePath =
+ carbonTablePath.getDictionaryMetaFilePath(columnIdentifier.getColumnId());
+ }
+
+ /**
+ * initialize the value of dictionary chunk that can be kept in memory at a time
+ */
+ private void initDictionaryChunkSize() {
+ try {
+ dictionary_one_chunk_size = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.DICTIONARY_ONE_CHUNK_SIZE,
+ CarbonCommonConstants.DICTIONARY_ONE_CHUNK_SIZE_DEFAULT));
+ } catch (NumberFormatException e) {
+ dictionary_one_chunk_size =
+ Integer.parseInt(CarbonCommonConstants.DICTIONARY_ONE_CHUNK_SIZE_DEFAULT);
+ LOGGER.error("Dictionary chunk size not configured properly. Taking default size "
+ + dictionary_one_chunk_size);
+ }
+ }
+
+ /**
+ * initialise one dictionary size chunk list and increment chunk count
+ */
+ private void createChunkList() {
+ this.oneDictionaryChunkList = new ArrayList<ByteBuffer>(dictionary_one_chunk_size);
+ chunk_count++;
+ }
+
+ /**
+ * if file already exists then read metadata file and
+ * validate the last entry end offset with file size. If
+ * they are not equal that means some invalid data is present which needs
+ * to be truncated
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ private void validateDictionaryFileOffsetWithLastSegmentEntryOffset() throws IOException {
+ // read last dictionary chunk meta entry from dictionary metadata file
+ chunkMetaObjectForLastSegmentEntry = getChunkMetaObjectForLastSegmentEntry();
+ int bytesToTruncate =
+ (int) (chunk_start_offset - chunkMetaObjectForLastSegmentEntry.getEnd_offset());
+ if (bytesToTruncate > 0) {
+ LOGGER.info("some inconsistency in dictionary file for column " + this.columnIdentifier);
+ // truncate the dictionary data till chunk meta end offset
+ FileFactory.FileType fileType = FileFactory.getFileType(this.dictionaryFilePath);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(this.dictionaryFilePath, fileType);
+ boolean truncateSuccess = carbonFile
+ .truncate(this.dictionaryFilePath, chunkMetaObjectForLastSegmentEntry.getEnd_offset());
+ if (!truncateSuccess) {
+ LOGGER.info("Diction file not truncated successfully for column " + this.columnIdentifier);
+ }
+ }
+ }
+
+ /**
+ * This method will write the dictionary metadata file for a given column
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ private void writeDictionaryMetadataFile() throws IOException {
+ // Format of dictionary metadata file
+ // min, max, start offset, end offset and chunk count
+ int min_surrogate_key = 0;
+ int max_surrogate_key = 0;
+ // case 1: first time dictionary writing
+ // previousMax = 0, totalRecordCount = 5, min = 1, max= 5
+ // case2: file already exists
+ // previousMax = 5, totalRecordCount = 10, min = 6, max = 15
+ // case 3: no unique values, total records 0
+ // previousMax = 15, totalRecordCount = 0, min = 15, max = 15
+ // both min and max equal to previous max
+ if (null != chunkMetaObjectForLastSegmentEntry) {
+ if (0 == totalRecordCount) {
+ min_surrogate_key = chunkMetaObjectForLastSegmentEntry.getMax_surrogate_key();
+ } else {
+ min_surrogate_key = chunkMetaObjectForLastSegmentEntry.getMax_surrogate_key() + 1;
+ }
+ max_surrogate_key =
+ chunkMetaObjectForLastSegmentEntry.getMax_surrogate_key() + totalRecordCount;
+ } else {
+ if (totalRecordCount > 0) {
+ min_surrogate_key = 1;
+ }
+ max_surrogate_key = totalRecordCount;
+ }
+ ColumnDictionaryChunkMeta dictionaryChunkMeta =
+ new ColumnDictionaryChunkMeta(min_surrogate_key, max_surrogate_key, chunk_start_offset,
+ chunk_end_offset, chunk_count);
+ openThriftWriter(this.dictionaryMetaFilePath);
+ // write dictionary metadata file
+ writeThriftObject(dictionaryChunkMeta);
+ closeThriftWriter();
+ LOGGER.info("Dictionary metadata file written successfully for column " + this.columnIdentifier
+ + " at path " + this.dictionaryMetaFilePath);
+ }
+
+ /**
+ * open thrift writer for writing dictionary chunk/meta object
+ *
+ * @param dictionaryFile can be dictionary file name or dictionary metadata file name
+ * @throws IOException if an I/O error occurs
+ */
+ private void openThriftWriter(String dictionaryFile) throws IOException {
+ // create thrift writer instance
+ dictionaryThriftWriter = new ThriftWriter(dictionaryFile, true);
+ // open the file stream
+ dictionaryThriftWriter.open();
+ }
+
+ /**
+ * This method will write the thrift object to a file
+ *
+ * @param dictionaryThriftObject can be dictionary thrift object or dictionary metadata
+ * thrift object
+ * @throws IOException if an I/O error occurs
+ */
+ private void writeThriftObject(TBase dictionaryThriftObject) throws IOException {
+ dictionaryThriftWriter.write(dictionaryThriftObject);
+ }
+
+ /**
+ * close dictionary thrift writer
+ */
+ private void closeThriftWriter() {
+ if (null != dictionaryThriftWriter) {
+ dictionaryThriftWriter.close();
+ }
+ }
+
+ /**
+ * This method will read the dictionary chunk metadata thrift object for last entry
+ *
+ * @return last entry of dictionary meta chunk
+ * @throws IOException if an I/O error occurs
+ */
+ private CarbonDictionaryColumnMetaChunk getChunkMetaObjectForLastSegmentEntry()
+ throws IOException {
+ CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk = null;
+ CarbonDictionaryMetadataReader columnMetadataReaderImpl = getDictionaryMetadataReader();
+ try {
+ // read the last segment entry for dictionary metadata
+ carbonDictionaryColumnMetaChunk =
+ columnMetadataReaderImpl.readLastEntryOfDictionaryMetaChunk();
+ } finally {
+ // Close metadata reader
+ columnMetadataReaderImpl.close();
+ }
+ return carbonDictionaryColumnMetaChunk;
+ }
+
+ /**
+ * @return
+ */
+ protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() {
+ return new CarbonDictionaryMetadataReaderImpl(hdfsStorePath, carbonTableIdentifier,
+ columnIdentifier);
+ }
+
+ @Override public void commit() throws IOException {
+ if (null != dictionaryThriftWriter) {
+ this.chunk_end_offset = CarbonUtil.getFileSize(this.dictionaryFilePath);
+ writeDictionaryMetadataFile();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
new file mode 100644
index 0000000..04d2b97
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer;
+
+import java.io.IOException;
+
+import org.apache.carbondata.format.FileFooter;
+
+/**
+ * Writes metadata block to the fact table file in thrift
+ * format org.apache.carbondata.format.FileFooter
+ */
+public class CarbonFooterWriter {
+
+ // It is version number of this format class.
+ private static int VERSION_NUMBER = 1;
+
+ // Fact file path
+ private String filePath;
+
+ public CarbonFooterWriter(String filePath) {
+ this.filePath = filePath;
+ }
+
+ /**
+ * It writes FileFooter thrift format object to file.
+ *
+ * @param footer
+ * @param currentPosition At where this metadata is going to be written.
+ * @throws IOException
+ */
+ public void writeFooter(FileFooter footer, long currentPosition) throws IOException {
+
+ ThriftWriter thriftWriter = openThriftWriter(filePath);
+ footer.setVersion(VERSION_NUMBER);
+ try {
+ thriftWriter.write(footer);
+ thriftWriter.writeOffset(currentPosition);
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ thriftWriter.close();
+ }
+ }
+
+ /**
+ * open thrift writer for writing dictionary chunk/meta object
+ */
+ private ThriftWriter openThriftWriter(String filePath) throws IOException {
+ // create thrift writer instance
+ ThriftWriter thriftWriter = new ThriftWriter(filePath, true);
+ // open the file stream
+ thriftWriter.open();
+ return thriftWriter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileWriter.java
new file mode 100644
index 0000000..bf6fc3b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer;
+
+import java.io.IOException;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reader class which will be used to read the index file
+ */
+public class CarbonIndexFileWriter {
+
+ /**
+ * thrift writer object
+ */
+ private ThriftWriter thriftWriter;
+
+ /**
+ * It writes thrift object to file
+ *
+ * @param footer
+ * @throws IOException
+ */
+ public void writeThrift(TBase indexObject) throws IOException {
+ thriftWriter.write(indexObject);
+ }
+
+ /**
+ * Below method will be used to open the thrift writer
+ *
+ * @param filePath file path where data need to be written
+ * @throws IOException throws io exception in case of any failure
+ */
+ public void openThriftWriter(String filePath) throws IOException {
+ // create thrift writer instance
+ thriftWriter = new ThriftWriter(filePath, true);
+ // open the file stream
+ thriftWriter.open();
+ }
+
+ /**
+ * Below method will be used to close the thrift object
+ */
+ public void close() {
+ thriftWriter.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/HierarchyValueWriterForCSV.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/HierarchyValueWriterForCSV.java b/core/src/main/java/org/apache/carbondata/core/writer/HierarchyValueWriterForCSV.java
new file mode 100644
index 0000000..61a89f9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/HierarchyValueWriterForCSV.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.pentaho.di.core.exception.KettleException;
+
+public class HierarchyValueWriterForCSV {
+
+ /**
+ * Comment for <code>LOGGER</code>
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(HierarchyValueWriterForCSV.class.getName());
+ /**
+ * hierarchyName
+ */
+ private String hierarchyName;
+
+ /**
+ * bufferedOutStream
+ */
+ private FileChannel outPutFileChannel;
+
+ /**
+ * storeFolderLocation
+ */
+ private String storeFolderLocation;
+
+ /**
+ * intialized
+ */
+ private boolean intialized;
+
+ /**
+ * counter the number of files.
+ */
+ private int counter;
+
+ /**
+ * byteArrayList
+ */
+ private List<ByteArrayHolder> byteArrayholder =
+ new ArrayList<ByteArrayHolder>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+ /**
+ * toflush
+ */
+ private int toflush;
+
+ public HierarchyValueWriterForCSV(String hierarchy, String storeFolderLocation) {
+ this.hierarchyName = hierarchy;
+ this.storeFolderLocation = storeFolderLocation;
+
+ CarbonProperties instance = CarbonProperties.getInstance();
+
+ this.toflush = Integer.parseInt(instance
+ .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+
+ int rowSetSize = Integer.parseInt(instance.getProperty(CarbonCommonConstants.GRAPH_ROWSET_SIZE,
+ CarbonCommonConstants.GRAPH_ROWSET_SIZE_DEFAULT));
+
+ if (this.toflush > rowSetSize) {
+ this.toflush = rowSetSize;
+ }
+
+ updateCounter(hierarchy, storeFolderLocation);
+ }
+
+ /**
+ * @return Returns the byteArrayList.
+ */
+ public List<ByteArrayHolder> getByteArrayList() throws KettleException {
+ return byteArrayholder;
+ }
+
+ public FileChannel getBufferedOutStream() {
+ return outPutFileChannel;
+ }
+
+ private void updateCounter(final String meString, String storeFolderLocation) {
+ File storeFolder = new File(storeFolderLocation);
+
+ File[] listFiles = storeFolder.listFiles(new FileFilter() {
+
+ @Override public boolean accept(File file) {
+ if (file.getName().indexOf(meString) > -1)
+
+ {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ if (null == listFiles || listFiles.length == 0) {
+ counter = 0;
+ return;
+ }
+
+ for (File hierFile : listFiles) {
+ String hierFileName = hierFile.getName();
+
+ if (hierFileName.endsWith(CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
+ hierFileName = hierFileName.substring(0, hierFileName.lastIndexOf('.'));
+ try {
+ counter = Integer.parseInt(hierFileName.substring(hierFileName.length() - 1));
+ } catch (NumberFormatException nfe) {
+
+ if (new File(hierFileName + '0' + CarbonCommonConstants.LEVEL_FILE_EXTENSION).exists()) {
+ // Need to skip because the case can come in which server went down while files were
+ // merging and the other hierarchy files were not deleted, and the current file
+ // status is inrogress. so again we will merge the files and rename to normal file
+ LOGGER.info("Need to skip as this can be case in which hierarchy file already renamed");
+ if (hierFile.delete()) {
+ LOGGER.info("Deleted the Inprogress hierarchy Files.");
+ }
+ } else {
+ // levelfileName0.level file not exist that means files is merged and other
+ // files got deleted. while renaming this file from inprogress to normal file,
+ // server got restarted/killed. so we need to rename the file to normal.
+
+ File inprogressFile = new File(storeFolder + File.separator + hierFile.getName());
+ File changetoName = new File(storeFolder + File.separator + hierFileName);
+
+ if (inprogressFile.renameTo(changetoName)) {
+ LOGGER.info(
+ "Renaming the level Files while creating the new instance on server startup.");
+ }
+
+ }
+
+ }
+ }
+
+ String val = hierFileName.substring(hierFileName.length() - 1);
+
+ int parsedVal = getIntValue(val);
+
+ if (counter < parsedVal) {
+ counter = parsedVal;
+ }
+ }
+ counter++;
+ }
+
+ private int getIntValue(String val) {
+ int parsedVal = 0;
+ try {
+ parsedVal = Integer.parseInt(val);
+ } catch (NumberFormatException nfe) {
+ LOGGER.info("Hierarchy File is already renamed so there will not be"
+ + "any need to keep the counter");
+ }
+ return parsedVal;
+ }
+
+ private void intialize() throws KettleException {
+ intialized = true;
+
+ File f = new File(storeFolderLocation + File.separator + hierarchyName + counter
+ + CarbonCommonConstants.FILE_INPROGRESS_STATUS);
+
+ counter++;
+
+ FileOutputStream fos = null;
+
+ boolean isFileCreated = false;
+ if (!f.exists()) {
+ try {
+ isFileCreated = f.createNewFile();
+
+ } catch (IOException e) {
+ //not required: findbugs fix
+ throw new KettleException("unable to create member mapping file", e);
+ }
+ if (!isFileCreated) {
+ throw new KettleException("unable to create file" + f.getAbsolutePath());
+ }
+ }
+
+ try {
+ fos = new FileOutputStream(f);
+
+ outPutFileChannel = fos.getChannel();
+ } catch (FileNotFoundException e) {
+ closeStreamAndDeleteFile(f, outPutFileChannel, fos);
+ throw new KettleException("member Mapping File not found to write mapping info", e);
+ }
+ }
+
+ public void writeIntoHierarchyFile(byte[] bytes, int primaryKey) throws KettleException {
+ if (!intialized) {
+ intialize();
+ }
+
+ ByteBuffer byteBuffer = storeValueInCache(bytes, primaryKey);
+
+ try {
+ byteBuffer.flip();
+ outPutFileChannel.write(byteBuffer);
+ } catch (IOException e) {
+ throw new KettleException("Error while writting in the hierarchy mapping file", e);
+ }
+ }
+
+ private ByteBuffer storeValueInCache(byte[] bytes, int primaryKey) {
+
+ // adding 4 to store the total length of the row at the beginning
+ ByteBuffer buffer = ByteBuffer.allocate(bytes.length + 4);
+
+ buffer.put(bytes);
+ buffer.putInt(primaryKey);
+
+ return buffer;
+ }
+
+ public void performRequiredOperation() throws KettleException {
+ if (byteArrayholder.size() == 0) {
+ return;
+ }
+ //write to the file and close the stream.
+ Collections.sort(byteArrayholder);
+
+ for (ByteArrayHolder byteArray : byteArrayholder) {
+ writeIntoHierarchyFile(byteArray.getMdKey(), byteArray.getPrimaryKey());
+ }
+
+ CarbonUtil.closeStreams(outPutFileChannel);
+
+ //rename the inprogress file to normal .level file
+ String filePath = this.storeFolderLocation + File.separator + hierarchyName + (counter - 1)
+ + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+ File inProgressFile = new File(filePath);
+ String inprogressFileName = inProgressFile.getName();
+
+ String changedFileName = inprogressFileName.substring(0, inprogressFileName.lastIndexOf('.'));
+
+ File orgFinalName = new File(this.storeFolderLocation + File.separator + changedFileName);
+
+ if (!inProgressFile.renameTo(orgFinalName)) {
+ LOGGER.error("Not able to rename file : " + inprogressFileName);
+ }
+
+ //create the new outputStream
+ try {
+ intialize();
+ } catch (KettleException e) {
+ LOGGER.error("Not able to create output stream for file:" + hierarchyName + (counter - 1));
+ }
+
+ //clear the byte array holder also.
+ byteArrayholder.clear();
+ }
+
+ private void closeStreamAndDeleteFile(File f, Closeable... streams) throws KettleException {
+ boolean isDeleted = false;
+ for (Closeable stream : streams) {
+ if (null != stream) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ LOGGER.error(e, "unable to close the stream ");
+ }
+
+ }
+ }
+
+ // delete the file
+ isDeleted = f.delete();
+ if (!isDeleted) {
+ LOGGER.error("Unable to delete the file " + f.getAbsolutePath());
+ }
+
+ }
+
+ public String getHierarchyName() {
+ return hierarchyName;
+ }
+
+ public int getCounter() {
+ return counter;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
new file mode 100644
index 0000000..c232fb2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+
+/**
+ * Simple class that makes it easy to write Thrift objects to disk.
+ */
+public class ThriftWriter {
+
+ /**
+ * buffer size
+ */
+ private static final int bufferSize = 2048;
+
+ /**
+ * File to write to.
+ */
+ private String fileName;
+
+ /**
+ * For writing to the file.
+ */
+ private DataOutputStream dataOutputStream;
+
+ /**
+ * For binary serialization of objects.
+ */
+ private TProtocol binaryOut;
+
+ /**
+ * flag to append to existing file
+ */
+ private boolean append;
+
+ /**
+ * Constructor.
+ */
+ public ThriftWriter(String fileName, boolean append) {
+ this.fileName = fileName;
+ this.append = append;
+ }
+
+ /**
+ * Open the file for writing.
+ */
+ public void open() throws IOException {
+ FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+ dataOutputStream = FileFactory.getDataOutputStream(fileName, fileType, bufferSize, append);
+ binaryOut = new TCompactProtocol(new TIOStreamTransport(dataOutputStream));
+ }
+
+ /**
+ * Write the object to disk.
+ */
+ public void write(TBase t) throws IOException {
+ try {
+ t.write(binaryOut);
+ dataOutputStream.flush();
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Write the offset to the file
+ *
+ * @param offset
+ * @throws IOException
+ */
+ public void writeOffset(long offset) throws IOException {
+ dataOutputStream.writeLong(offset);
+ }
+
+ /**
+ * Close the file stream.
+ */
+ public void close() {
+ CarbonUtil.closeStreams(dataOutputStream);
+ }
+
+ /**
+ * Flush data to HDFS file
+ */
+ public void sync() throws IOException {
+ if (dataOutputStream instanceof FSDataOutputStream) {
+ ((FSDataOutputStream) dataOutputStream).hsync();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/exception/CarbonDataWriterException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/exception/CarbonDataWriterException.java b/core/src/main/java/org/apache/carbondata/core/writer/exception/CarbonDataWriterException.java
new file mode 100644
index 0000000..2d978fb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/exception/CarbonDataWriterException.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer.exception;
+
+import java.util.Locale;
+
+public class CarbonDataWriterException extends Exception {
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public CarbonDataWriterException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public CarbonDataWriterException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * getLocalizedMessage
+ */
+ @Override public String getLocalizedMessage() {
+ return super.getLocalizedMessage();
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+
+ /**
+ * This method is used to get the localized message.
+ *
+ * @param locale - A Locale object represents a specific geographical,
+ * political, or cultural region.
+ * @return - Localized error message.
+ */
+ public String getLocalizedMessage(Locale locale) {
+ return "";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java
new file mode 100644
index 0000000..e9d7b1d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for writing the dictionary sort index and sort index revers data.
+ */
+public interface CarbonDictionarySortIndexWriter extends Closeable {
+
+ /**
+ * The method is used write the dictionary sortIndex data to columns
+ * sortedIndex file in thrif format.
+ *
+ * @param sortIndexList list of sortIndex
+ * @throws IOException In Case of any I/O errors occurs.
+ */
+ public void writeSortIndex(List<Integer> sortIndexList) throws IOException;
+
+ /**
+ * The method is used write the dictionary sortIndexInverted data to columns
+ * sortedIndex file in thrif format.
+ *
+ * @param invertedSortIndexList list of sortIndexInverted
+ * @throws IOException In Case of any I/O errors occurs.
+ */
+ public void writeInvertedSortIndex(List<Integer> invertedSortIndexList) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
new file mode 100644
index 0000000..b6df97d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.format.ColumnSortInfo;
+
+/**
+ * The class responsible for writing the dictionary/column sort index and sort index inverted data
+ * in the thrift format
+ */
+public class CarbonDictionarySortIndexWriterImpl implements CarbonDictionarySortIndexWriter {
+
+ /**
+ * carbonTable Identifier holding the info of databaseName and tableName
+ */
+ protected CarbonTableIdentifier carbonTableIdentifier;
+
+ /**
+ * column name
+ */
+ protected ColumnIdentifier columnIdentifier;
+
+ /**
+ * carbon store location
+ */
+ protected String carbonStorePath;
+ /**
+ * Path of dictionary sort index file for which the sortIndex to be written
+ */
+ protected String sortIndexFilePath;
+ /**
+ * Instance of thrift writer to write the data
+ */
+ private ThriftWriter sortIndexThriftWriter;
+
+ /**
+ * Column sort info thrift instance.
+ */
+ private ColumnSortInfo columnSortInfo = new ColumnSortInfo();
+
+ /**
+ * Comment for <code>LOGGER</code>
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonDictionarySortIndexWriterImpl.class.getName());
+
+ /**
+ * @param carbonStorePath Carbon store path
+ * @param carbonTableIdentifier table identifier which will give table name and database name
+ * @param columnIdentifier column unique identifier
+ */
+ public CarbonDictionarySortIndexWriterImpl(final CarbonTableIdentifier carbonTableIdentifier,
+ final ColumnIdentifier columnIdentifier, final String carbonStorePath) {
+ this.carbonTableIdentifier = carbonTableIdentifier;
+ this.columnIdentifier = columnIdentifier;
+ this.carbonStorePath = carbonStorePath;
+ }
+
+ /**
+ * The method is used populate the dictionary sortIndex data to columnSortInfo
+ * in thrif format.
+ *
+ * @param sortIndexList list of sortIndex
+ * @throws IOException In Case of any I/O errors occurs.
+ */
+ @Override public void writeSortIndex(List<Integer> sortIndexList) throws IOException {
+ columnSortInfo.setSort_index(sortIndexList);
+ }
+
+ /**
+ * The method is used populate the dictionary Inverted sortIndex data to columnSortInfo
+ * in thrif format.
+ *
+ * @param invertedSortIndexList list of sortIndexInverted
+ * @throws IOException In Case of any I/O errors occurs.
+ */
+ @Override public void writeInvertedSortIndex(List<Integer> invertedSortIndexList)
+ throws IOException {
+ columnSortInfo.setSort_index_inverted(invertedSortIndexList);
+ }
+
+ /**
+ * Initialize the sortIndexFilePath and open writing stream
+ * for dictionary sortIndex file thrif writer
+ * write the column sort info to the store when both sort index and sort index
+ * inverted are populated.
+ * existing sort index file has to be overwritten with new sort index data
+ * columnSortInfo having null sortIndex and invertedSortIndex will not be written
+ */
+ private void writeColumnSortInfo() throws IOException {
+ boolean isNotNull =
+ null != columnSortInfo.getSort_index() && null != columnSortInfo.sort_index_inverted;
+ if (isNotNull) {
+ initPath();
+ String folderContainingFile = CarbonTablePath.getFolderContainingFile(this.sortIndexFilePath);
+ boolean created = CarbonUtil.checkAndCreateFolder(folderContainingFile);
+ if (!created) {
+ LOGGER.error("Database metadata folder creation status :: " + created);
+ throw new IOException("Failed to created database metadata folder");
+ }
+ try {
+
+ this.sortIndexThriftWriter = new ThriftWriter(this.sortIndexFilePath, false);
+ this.sortIndexThriftWriter.open();
+ sortIndexThriftWriter.write(columnSortInfo);
+ } catch (IOException ie) {
+ LOGGER.error(ie,
+ "problem while writing the dictionary sort index file.");
+ throw new IOException("problem while writing the dictionary sort index file.", ie);
+ } finally {
+ if (null != sortIndexThriftWriter) {
+ this.sortIndexThriftWriter.close();
+ }
+ this.sortIndexFilePath = null;
+ }
+ }
+ }
+
+ protected void initPath() {
+ PathService pathService = CarbonCommonFactory.getPathService();
+ CarbonTablePath carbonTablePath = pathService
+ .getCarbonTablePath(columnIdentifier, carbonStorePath, carbonTableIdentifier);
+ String dictionaryPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
+ long dictOffset = CarbonUtil.getFileSize(dictionaryPath);
+ this.sortIndexFilePath =
+ carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset);
+ cleanUpOldSortIndex(carbonTablePath, dictionaryPath);
+ }
+
+ /**
+ * It cleans up old unused sortindex file
+ *
+ * @param carbonTablePath
+ */
+ protected void cleanUpOldSortIndex(CarbonTablePath carbonTablePath, String dictPath) {
+ CarbonFile dictFile =
+ FileFactory.getCarbonFile(dictPath, FileFactory.getFileType(dictPath));
+ CarbonFile[] files =
+ carbonTablePath.getSortIndexFiles(dictFile.getParentFile(),
+ columnIdentifier.getColumnId());
+ int maxTime;
+ try {
+ maxTime = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+ } catch (NumberFormatException e) {
+ maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
+ }
+ if (null != files) {
+ Arrays.sort(files, new Comparator<CarbonFile>() {
+ @Override public int compare(CarbonFile o1, CarbonFile o2) {
+ return o1.getName().compareTo(o2.getName());
+ }
+ });
+ for (int i = 0; i < files.length - 1; i++) {
+ long difference = System.currentTimeMillis() - files[i].getLastModifiedTime();
+ long minutesElapsed = (difference / (1000 * 60));
+ if (minutesElapsed > maxTime) {
+ if (!files[i].delete()) {
+ LOGGER.warn("Failed to delete sortindex file." + files[i].getAbsolutePath());
+ } else {
+ LOGGER.info("Sort index file is deleted." + files[i].getAbsolutePath());
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override public void close() throws IOException {
+ writeColumnSortInfo();
+ if (null != sortIndexThriftWriter) {
+ sortIndexThriftWriter.close();
+ }
+ }
+}