You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/11 13:25:41 UTC
carbondata git commit: [CARBONDATA-1537] Fixed version compatabilty
issues from V1 to latest carbon version
Repository: carbondata
Updated Branches:
refs/heads/master ad25ffc31 -> 133b30391
[CARBONDATA-1537] Fixed version compatabilty issues from V1 to latest carbon version
This closes #1398
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/133b3039
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/133b3039
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/133b3039
Branch: refs/heads/master
Commit: 133b30391a41131d510485ce77f4f1f2bf35900c
Parents: ad25ffc
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Sat Sep 30 17:36:01 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Oct 11 21:25:22 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 17 +---
.../core/datastore/block/TableBlockInfo.java | 13 +++
.../page/encoding/ColumnPageEncoderMeta.java | 34 +------
.../page/encoding/EncodingFactory.java | 14 ++-
.../statistics/PrimitivePageStatsCollector.java | 6 +-
.../blockletindex/BlockletDataMap.java | 95 +++++++++++++++++++-
.../BlockletDataRefNodeWrapper.java | 7 ++
.../core/metadata/ValueEncoderMeta.java | 20 +----
.../core/metadata/blocklet/BlockletInfo.java | 67 ++++++++++++++
.../metadata/blocklet/datachunk/DataChunk.java | 2 +-
.../core/metadata/datatype/DataType.java | 54 ++++++++++-
.../util/AbstractDataFileFooterConverter.java | 7 +-
.../apache/carbondata/core/util/CarbonUtil.java | 87 ++----------------
.../core/util/DataFileFooterConverter.java | 21 ++++-
.../core/util/CarbonMetadataUtilTest.java | 2 +-
.../src/test/resources/testdatafileslist.txt | 4 +-
.../CarbonV1toV3CompatabilityTestCase.scala | 87 ++++++++++++++++++
.../cluster/sdv/suite/SDVSuites.scala | 34 +++++--
.../sql/test/ResourceRegisterAndCopier.scala | 57 +++++++++++-
.../org/apache/spark/sql/CarbonSession.scala | 2 +-
20 files changed, 456 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 36d73d7..c1a3ec7 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -696,22 +696,7 @@ public final class CarbonCommonConstants {
* max lru cache size default value in MB
*/
public static final String CARBON_MAX_LRU_CACHE_SIZE_DEFAULT = "-1";
- /**
- * DOUBLE_VALUE_MEASURE
- */
- public static final char DOUBLE_MEASURE = 'n';
- /**
- * BYTE_VALUE_MEASURE
- */
- public static final char BYTE_VALUE_MEASURE = 'c';
- /**
- * BIG_DECIMAL_MEASURE
- */
- public static final char BIG_DECIMAL_MEASURE = 'b';
- /**
- * BIG_INT_MEASURE
- */
- public static final char BIG_INT_MEASURE = 'd';
+
/**
* CARBON_PREFETCH_BUFFERSIZE
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 6389528..910c9bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.datastore.block;
+import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.HashMap;
@@ -29,6 +30,9 @@ import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
/**
* class will be used to pass the block detail detail will be passed form driver
* to all the executor to load the b+ tree
@@ -162,6 +166,15 @@ public class TableBlockInfo implements Distributable, Serializable {
* @return the blockLength
*/
public long getBlockLength() {
+ if (blockLength == 0) {
+ Path path = new Path(filePath);
+ try {
+ FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+ blockLength = fs.listStatus(path)[0].getLen();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
return blockLength;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 422ce67..3f8fca6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -50,11 +49,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
private int scale;
private int precision;
- public static final char DOUBLE_MEASURE = 'n';
- public static final char STRING = 's';
- public static final char TIMESTAMP = 't';
- public static final char DATE = 'x';
- public static final char BYTE_ARRAY = 'y';
+
public ColumnPageEncoderMeta() {
}
@@ -73,7 +68,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
this.columnSpec = columnSpec;
this.storeDataType = storeDataType;
this.compressorName = compressorName;
- setType(convertType(storeDataType));
+ setType(DataType.convertType(storeDataType));
if (stats != null) {
setDecimal(stats.getDecimalCount());
setMaxValue(stats.getMax());
@@ -83,30 +78,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
}
}
- private char convertType(DataType type) {
- switch (type) {
- case BYTE:
- case SHORT:
- case SHORT_INT:
- case INT:
- case LONG:
- return CarbonCommonConstants.BIG_INT_MEASURE;
- case DOUBLE:
- return CarbonCommonConstants.DOUBLE_MEASURE;
- case DECIMAL:
- return CarbonCommonConstants.BIG_DECIMAL_MEASURE;
- case STRING:
- return STRING;
- case TIMESTAMP:
- return TIMESTAMP;
- case DATE:
- return DATE;
- case BYTE_ARRAY:
- return BYTE_ARRAY;
- default:
- throw new RuntimeException("Unexpected type: " + type);
- }
- }
+
public DataType getStoreDataType() {
return storeDataType;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 9a52183..1cb1613 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.Encoding;
@@ -107,7 +108,7 @@ public abstract class EncodingFactory {
TableSpec.ColumnSpec spec = new TableSpec.ColumnSpec("legacy", stats.getDataType(),
ColumnType.MEASURE);
String compressor = "snappy";
- switch (metadata.getType()) {
+ switch (DataType.getDataType(metadata.getType())) {
case BYTE:
case SHORT:
case INT:
@@ -126,7 +127,7 @@ public abstract class EncodingFactory {
return codec.createDecoder(meta);
} else if (codec instanceof DirectCompressCodec) {
ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
- metadata.getType(), stats, compressor);
+ DataType.getDataType(metadata.getType()), stats, compressor);
return codec.createDecoder(meta);
} else {
throw new RuntimeException("internal error");
@@ -142,7 +143,7 @@ public abstract class EncodingFactory {
return codec.createDecoder(meta);
} else if (codec instanceof DirectCompressCodec) {
ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
- metadata.getType(), stats, compressor);
+ DataType.getDataType(metadata.getType()), stats, compressor);
return codec.createDecoder(meta);
} else {
throw new RuntimeException("internal error");
@@ -152,6 +153,13 @@ public abstract class EncodingFactory {
// no dictionary dimension
return new DirectCompressCodec(stats.getDataType()).createDecoder(
new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));
+ // In case of older versions like in V1 format it has special datatype to handle
+ case LEGACY_LONG:
+ AdaptiveIntegralCodec adaptiveCodec =
+ new AdaptiveIntegralCodec(DataType.LONG, DataType.LONG, stats);
+ ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+ adaptiveCodec.getTargetDataType(), stats, compressor);
+ return adaptiveCodec.createDecoder(meta);
default:
throw new RuntimeException("unsupported data type: " + stats.getDataType());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index 2f6178b..9490b93 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -92,9 +92,9 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
public static PrimitivePageStatsCollector newInstance(ValueEncoderMeta meta) {
PrimitivePageStatsCollector instance =
- new PrimitivePageStatsCollector(meta.getType(), -1, -1);
+ new PrimitivePageStatsCollector(DataType.getDataType(meta.getType()), -1, -1);
// set min max from meta
- switch (meta.getType()) {
+ switch (DataType.getDataType(meta.getType())) {
case BYTE:
instance.minByte = (byte) meta.getMinValue();
instance.maxByte = (byte) meta.getMaxValue();
@@ -107,6 +107,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
instance.minInt = (int) meta.getMinValue();
instance.maxInt = (int) meta.getMaxValue();
break;
+ case LEGACY_LONG:
case LONG:
instance.minLong = (long) meta.getMinValue();
instance.maxLong = (long) meta.getMaxValue();
@@ -145,6 +146,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
minInt = Integer.MAX_VALUE;
maxInt = Integer.MIN_VALUE;
break;
+ case LEGACY_LONG:
case LONG:
minLong = Long.MAX_VALUE;
maxLong = Long.MIN_VALUE;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index adcb1a6..66d07dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -22,6 +22,7 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,12 +51,14 @@ import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.DataTypeUtil;
/**
* Datamap implementation for blocklet.
@@ -133,9 +136,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
- row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal);
+ row.setRow(addMinMax(minMaxLen, schema[ordinal],
+ updateMinValues(minMaxIndex.getMinValues(), minMaxLen)), ordinal);
ordinal++;
- row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal);
+ row.setRow(addMinMax(minMaxLen, schema[ordinal],
+ updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen)), ordinal);
ordinal++;
row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
@@ -169,6 +174,92 @@ public class BlockletDataMap implements DataMap, Cacheable {
}
}
+ /**
+ * Fill the measures min values with minimum , this is needed for backward version compatability
+ * as older versions don't store min values for measures
+ */
+ private byte[][] updateMinValues(byte[][] minValues, int[] minMaxLen) {
+ byte[][] updatedValues = minValues;
+ if (minValues.length < minMaxLen.length) {
+ updatedValues = new byte[minMaxLen.length][];
+ System.arraycopy(minValues, 0, updatedValues, 0, minValues.length);
+ List<CarbonMeasure> measures = segmentProperties.getMeasures();
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ for (int i = 0; i < measures.size(); i++) {
+ buffer.rewind();
+ switch (measures.get(i).getDataType()) {
+ case BYTE:
+ buffer.putLong(Byte.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ break;
+ case SHORT:
+ buffer.putLong(Short.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ break;
+ case INT:
+ buffer.putLong(Integer.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ break;
+ case LONG:
+ buffer.putLong(Long.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ break;
+ case DECIMAL:
+ updatedValues[minValues.length + i] =
+ DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
+ break;
+ default:
+ buffer.putDouble(Double.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ }
+ }
+ }
+ return updatedValues;
+ }
+
+ /**
+ * Fill the measures max values with maximum , this is needed for backward version compatability
+ * as older versions don't store max values for measures
+ */
+ private byte[][] updateMaxValues(byte[][] maxValues, int[] minMaxLen) {
+ byte[][] updatedValues = maxValues;
+ if (maxValues.length < minMaxLen.length) {
+ updatedValues = new byte[minMaxLen.length][];
+ System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length);
+ List<CarbonMeasure> measures = segmentProperties.getMeasures();
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ for (int i = 0; i < measures.size(); i++) {
+ buffer.rewind();
+ switch (measures.get(i).getDataType()) {
+ case BYTE:
+ buffer.putLong(Byte.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ break;
+ case SHORT:
+ buffer.putLong(Short.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ break;
+ case INT:
+ buffer.putLong(Integer.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ break;
+ case LONG:
+ buffer.putLong(Long.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ break;
+ case DECIMAL:
+ updatedValues[maxValues.length + i] =
+ DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
+ break;
+ default:
+ buffer.putDouble(Double.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ }
+ }
+ }
+ return updatedValues;
+ }
+
private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) {
DataMapSchema[] minSchemas =
((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
index 8896e5d..5e0f4cf 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
/**
@@ -46,6 +47,12 @@ public class BlockletDataRefNodeWrapper implements DataRefNode {
public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
int[] dimensionLens) {
this.blockInfos = blockInfos;
+ // Update row count and page count to blocklet info
+ for (TableBlockInfo blockInfo: blockInfos) {
+ BlockletDetailInfo detailInfo = blockInfo.getDetailInfo();
+ detailInfo.getBlockletInfo().setNumberOfRows(detailInfo.getRowCount());
+ detailInfo.getBlockletInfo().setNumberOfPages(detailInfo.getPagesCount());
+ }
this.index = index;
this.dimensionLens = dimensionLens;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
index a7383ea..4a9007c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
@@ -19,9 +19,6 @@ package org.apache.carbondata.core.metadata;
import java.io.Serializable;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
/**
* DO NOT MODIFY THIS CLASS AND PACKAGE NAME, BECAUSE
* IT IS SERIALIZE TO STORE
@@ -29,8 +26,6 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
*/
public class ValueEncoderMeta implements Serializable {
- private static final long serialVersionUID = -0L;
-
/**
* maxValue
*/
@@ -83,20 +78,7 @@ public class ValueEncoderMeta implements Serializable {
this.decimal = decimal;
}
- public DataType getType() {
- switch (type) {
- case CarbonCommonConstants.BIG_INT_MEASURE:
- return DataType.LONG;
- case CarbonCommonConstants.DOUBLE_MEASURE:
- return DataType.DOUBLE;
- case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
- return DataType.DECIMAL;
- default:
- throw new RuntimeException("Unexpected type: " + type);
- }
- }
-
- public char getTypeInChar() {
+ public char getType() {
return type;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index f81f805..19a4923 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -17,9 +17,13 @@
package org.apache.carbondata.core.metadata.blocklet;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -214,6 +218,49 @@ public class BlockletInfo implements Serializable, Writable {
for (int i = 0; i < mSize; i++) {
output.writeInt(measureChunksLength.get(i));
}
+ writeChunkInfoForOlderVersions(output);
+
+ }
+
+ /**
+ * Serialize datachunks as well for older versions like V1 and V2
+ */
+ private void writeChunkInfoForOlderVersions(DataOutput output) throws IOException {
+ int dimChunksSize = dimensionColumnChunk != null ? dimensionColumnChunk.size() : 0;
+ output.writeShort(dimChunksSize);
+ for (int i = 0; i < dimChunksSize; i++) {
+ byte[] bytes = serializeDataChunk(dimensionColumnChunk.get(i));
+ output.writeInt(bytes.length);
+ output.write(bytes);
+ }
+ int msrChunksSize = measureColumnChunk != null ? measureColumnChunk.size() : 0;
+ output.writeShort(msrChunksSize);
+ for (int i = 0; i < msrChunksSize; i++) {
+ byte[] bytes = serializeDataChunk(measureColumnChunk.get(i));
+ output.writeInt(bytes.length);
+ output.write(bytes);
+ }
+ }
+
+ private byte[] serializeDataChunk(DataChunk chunk) throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ ObjectOutputStream outputStream = new ObjectOutputStream(stream);
+ outputStream.writeObject(chunk);
+ outputStream.close();
+ return stream.toByteArray();
+ }
+
+ private DataChunk deserializeDataChunk(byte[] bytes) throws IOException {
+ ByteArrayInputStream stream = new ByteArrayInputStream(bytes);
+ ObjectInputStream inputStream = new ObjectInputStream(stream);
+ DataChunk dataChunk = null;
+ try {
+ dataChunk = (DataChunk) inputStream.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ inputStream.close();
+ return dataChunk;
}
@Override public void readFields(DataInput input) throws IOException {
@@ -238,6 +285,26 @@ public class BlockletInfo implements Serializable, Writable {
for (int i = 0; i < measureChunkOffsetsSize; i++) {
measureChunksLength.add(input.readInt());
}
+ readChunkInfoForOlderVersions(input);
+ }
+ /**
+ * Deserialize datachunks as well for older versions like V1 and V2
+ */
+ private void readChunkInfoForOlderVersions(DataInput input) throws IOException {
+ short dimChunksSize = input.readShort();
+ dimensionColumnChunk = new ArrayList<>(dimChunksSize);
+ for (int i = 0; i < dimChunksSize; i++) {
+ byte[] bytes = new byte[input.readInt()];
+ input.readFully(bytes);
+ dimensionColumnChunk.add(deserializeDataChunk(bytes));
+ }
+ short msrChunksSize = input.readShort();
+ measureColumnChunk = new ArrayList<>(msrChunksSize);
+ for (int i = 0; i < msrChunksSize; i++) {
+ byte[] bytes = new byte[input.readInt()];
+ input.readFully(bytes);
+ measureColumnChunk.add(deserializeDataChunk(bytes));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java
index a85706b..528b261 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java
@@ -51,7 +51,7 @@ public class DataChunk implements Serializable {
/**
* information about presence of values in each row of this column chunk
*/
- private transient BitSet nullValueIndexForColumn;
+ private BitSet nullValueIndexForColumn;
/**
* offset of row id page, only if encoded using inverted index
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index e97cce0..a37f265 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -34,11 +34,21 @@ public enum DataType {
STRUCT(10, "STRUCT", -1),
MAP(11, "MAP", -1),
BYTE(12, "BYTE", 1),
-
// internal use only, for variable length data type
BYTE_ARRAY(13, "BYTE_ARRAY", -1),
// internal use only, for value compression from integer/long to 3 bytes value
- SHORT_INT(14, "SHORT_INT", 3);
+ SHORT_INT(14, "SHORT_INT", 3),
+ // Only for internal use for backward compatability. It is only used for V1 version
+ LEGACY_LONG(15, "LEGACYBIGINT", 8);
+
+ public static final char DOUBLE_MEASURE_CHAR = 'n';
+ public static final char STRING_CHAR = 's';
+ public static final char TIMESTAMP_CHAR = 't';
+ public static final char DATE_CHAR = 'x';
+ public static final char BYTE_ARRAY_CHAR = 'y';
+ public static final char BYTE_VALUE_MEASURE_CHAR = 'c';
+ public static final char BIG_DECIMAL_MEASURE_CHAR = 'b';
+ public static final char BIG_INT_MEASURE_CHAR = 'd';
private int precedenceOrder;
private String name;
@@ -114,4 +124,44 @@ public enum DataType {
throw new RuntimeException("create DataType with invalid ordinal: " + ordinal);
}
}
+
+ public static char convertType(DataType type) {
+ switch (type) {
+ case BYTE:
+ case SHORT:
+ case SHORT_INT:
+ case INT:
+ case LONG:
+ return BIG_INT_MEASURE_CHAR;
+ case DOUBLE:
+ return DOUBLE_MEASURE_CHAR;
+ case DECIMAL:
+ return BIG_DECIMAL_MEASURE_CHAR;
+ case STRING:
+ return STRING_CHAR;
+ case TIMESTAMP:
+ return TIMESTAMP_CHAR;
+ case DATE:
+ return DATE_CHAR;
+ case BYTE_ARRAY:
+ return BYTE_ARRAY_CHAR;
+ default:
+ throw new RuntimeException("Unexpected type: " + type);
+ }
+ }
+
+ public static DataType getDataType(char type) {
+ switch (type) {
+ case BIG_INT_MEASURE_CHAR:
+ return DataType.LONG;
+ case DOUBLE_MEASURE_CHAR:
+ return DataType.DOUBLE;
+ case BIG_DECIMAL_MEASURE_CHAR:
+ return DataType.DECIMAL;
+ case 'l':
+ return DataType.LEGACY_LONG;
+ default:
+ throw new RuntimeException("Unexpected type: " + type);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 19ead44..c1cf8e8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -156,7 +156,12 @@ public abstract class AbstractDataFileFooterConverter {
tableBlockInfo.setVersion(version);
int blockletSize = getBlockletSize(readBlockIndexInfo);
tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
- tableBlockInfo.setFilePath(parentPath + "/" + readBlockIndexInfo.file_name);
+ String fileName = readBlockIndexInfo.file_name;
+ // Take only name of file.
+ if (fileName.lastIndexOf("/") > 0) {
+ fileName = fileName.substring(fileName.lastIndexOf("/"));
+ }
+ tableBlockInfo.setFilePath(parentPath + "/" + fileName);
dataFileFooter.setBlockletIndex(blockletIndex);
dataFileFooter.setColumnInTable(columnSchemaList);
dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 683633f..d0171b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -922,7 +922,8 @@ public final class CarbonUtil {
ColumnarFormatVersion.valueOf(detailInfo.getVersionNumber());
AbstractDataFileFooterConverter dataFileFooterConverter =
DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version);
- fileFooter.setColumnInTable(dataFileFooterConverter.getSchema(tableBlockInfo));
+ List<ColumnSchema> schema = dataFileFooterConverter.getSchema(tableBlockInfo);
+ fileFooter.setColumnInTable(schema);
SegmentInfo segmentInfo = new SegmentInfo();
segmentInfo.setColumnCardinality(detailInfo.getDimLens());
segmentInfo.setNumberOfColumns(detailInfo.getRowCount());
@@ -1446,17 +1447,17 @@ public final class CarbonUtil {
ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();
valueEncoderMeta.setType(measureType);
switch (measureType) {
- case CarbonCommonConstants.DOUBLE_MEASURE:
+ case DataType.DOUBLE_MEASURE_CHAR:
valueEncoderMeta.setMaxValue(buffer.getDouble());
valueEncoderMeta.setMinValue(buffer.getDouble());
valueEncoderMeta.setUniqueValue(buffer.getDouble());
break;
- case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+ case DataType.BIG_DECIMAL_MEASURE_CHAR:
valueEncoderMeta.setMaxValue(BigDecimal.valueOf(Long.MAX_VALUE));
valueEncoderMeta.setMinValue(BigDecimal.valueOf(Long.MIN_VALUE));
valueEncoderMeta.setUniqueValue(BigDecimal.valueOf(Long.MIN_VALUE));
break;
- case CarbonCommonConstants.BIG_INT_MEASURE:
+ case DataType.BIG_INT_MEASURE_CHAR:
valueEncoderMeta.setMaxValue(buffer.getLong());
valueEncoderMeta.setMinValue(buffer.getLong());
valueEncoderMeta.setUniqueValue(buffer.getLong());
@@ -1469,40 +1470,6 @@ public final class CarbonUtil {
return valueEncoderMeta;
}
- public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta)
- throws IOException {
- ByteBuffer buffer = null;
- switch (valueEncoderMeta.getType()) {
- case LONG:
- buffer = ByteBuffer.allocate(
- (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
- + 3);
- buffer.putChar(valueEncoderMeta.getTypeInChar());
- buffer.putLong((Long) valueEncoderMeta.getMaxValue());
- buffer.putLong((Long) valueEncoderMeta.getMinValue());
- buffer.putLong(0L); // unique value, not used
- break;
- case DOUBLE:
- buffer = ByteBuffer.allocate(
- (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
- + 3);
- buffer.putChar(valueEncoderMeta.getTypeInChar());
- buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
- buffer.putDouble((Double) valueEncoderMeta.getMinValue());
- buffer.putDouble(0d); // unique value, not used
- break;
- case DECIMAL:
- buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
- buffer.putChar(valueEncoderMeta.getTypeInChar());
- break;
- default:
- throw new IOException("Unsupported datatype: " + valueEncoderMeta.getType());
- }
- buffer.putInt(0); // decimal point, not used
- buffer.put(valueEncoderMeta.getDataTypeSelected());
- buffer.flip();
- return buffer.array();
- }
/**
* Below method will be used to convert indexes in range
@@ -1983,49 +1950,6 @@ public final class CarbonUtil {
}
}
- public static byte[] getMaxValueAsBytes(ValueEncoderMeta meta) {
- ByteBuffer b;
- switch (meta.getType()) {
- case LONG:
- b = ByteBuffer.allocate(8);
- b.putLong((long) meta.getMaxValue());
- b.flip();
- return b.array();
- case DOUBLE:
- b = ByteBuffer.allocate(8);
- b.putDouble((double) meta.getMaxValue());
- b.flip();
- return b.array();
- case DECIMAL:
- case BYTE_ARRAY:
- return new byte[8];
- default:
- throw new IllegalArgumentException("Invalid data type: " + meta.getType());
- }
- }
-
- public static byte[] getMinValueAsBytes(ValueEncoderMeta meta) {
- ByteBuffer b;
- switch (meta.getType()) {
- case LONG:
- b = ByteBuffer.allocate(8);
- b.putLong((long) meta.getMinValue());
- b.flip();
- return b.array();
- case DOUBLE:
- b = ByteBuffer.allocate(8);
- b.putDouble((double) meta.getMinValue());
- b.flip();
- return b.array();
- case DECIMAL:
- case BYTE_ARRAY:
- return new byte[8];
- default:
- throw new IllegalArgumentException("Invalid data type: " + meta.getType());
- }
- }
-
-
/**
* convert value to byte array
*/
@@ -2070,6 +1994,5 @@ public final class CarbonUtil {
}
}
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 3ac6987..e61b477 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -123,6 +123,25 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
}
@Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
- return null;
+ FileHolder fileReader = null;
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ try {
+ long completeBlockLength = tableBlockInfo.getBlockLength();
+ long footerPointer = completeBlockLength - 8;
+ fileReader = FileFactory.getFileHolder(FileFactory.getFileType(tableBlockInfo.getFilePath()));
+ long actualFooterOffset = fileReader.readLong(tableBlockInfo.getFilePath(), footerPointer);
+ CarbonFooterReader reader =
+ new CarbonFooterReader(tableBlockInfo.getFilePath(), actualFooterOffset);
+ FileFooter footer = reader.readFooter();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ } finally {
+ if (null != fileReader) {
+ fileReader.finish();
+ }
+ }
+ return columnSchemaList;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 35b45ca..d4bd06b 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -123,7 +123,7 @@ public class CarbonMetadataUtilTest {
meta.setDecimal(5);
meta.setMinValue(objMinArr);
meta.setMaxValue(objMaxArr);
- meta.setType(ColumnPageEncoderMeta.DOUBLE_MEASURE);
+ meta.setType(org.apache.carbondata.core.metadata.datatype.DataType.DOUBLE_MEASURE_CHAR);
List<Encoding> encoders = new ArrayList<>();
encoders.add(Encoding.INVERTED_INDEX);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt b/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt
index 08ff519..959ff6b 100644
--- a/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt
+++ b/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt
@@ -231,4 +231,6 @@ Data/badrecords_4.csv
Data/badrecords_5.csv
Data/emptyLoad.csv
Data/splchar.csv
-source.csv
\ No newline at end of file
+source.csv
+Data/v1_version/metastore_db.zip
+Data/v1_version/store.zip
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
new file mode 100644
index 0000000..d737092
--- /dev/null
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cluster.sdv.generated
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.CarbonSessionState
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * V1 to V3 compatability test. This test has to be at last
+ */
+class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll {
+
+ var localspark: CarbonSession = null
+ val storeLocation = s"${TestQueryExecutor.integrationPath}/spark-common-test/src/test/resources/Data/v1_version/store"
+ val metaLocation = s"${TestQueryExecutor.integrationPath}/spark-common-test/src/test/resources/Data/v1_version"
+
+ override def beforeAll {
+ sqlContext.sparkSession.stop()
+ CarbonEnv.carbonEnvMap.clear()
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
+ import org.apache.spark.sql.CarbonSession._
+ println(s"store path for CarbonV1toV3CompatabilityTestCase is $storeLocation and metastore is $metaLocation")
+ localspark = SparkSession
+ .builder()
+ .master("local")
+ .appName("CarbonV1toV3CompatabilityTestCase")
+ .config("spark.driver.host", "localhost")
+ .getOrCreateCarbonSession(storeLocation, metaLocation).asInstanceOf[CarbonSession]
+ println("store path from env : " + CarbonEnv.getInstance(localspark).storePath)
+ localspark.sparkContext.setLogLevel("WARN")
+ localspark.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ .runSqlHive(
+ s"ALTER TABLE default.t3 SET SERDEPROPERTIES" +
+ s"('tablePath'='$storeLocation/default/t3')")
+ localspark.sql("show tables").show()
+ }
+
+ test("test v1 to v3 compatabilty reading all with out fail") {
+ val length = localspark.sql("select ID,date,country,name,phonetype,serialname,salary from t3")
+ .collect().length
+ assert(length == 1000)
+ }
+
+ test("test v1 to v3 compatabilty groupy by query") {
+ val dataFrame = localspark
+ .sql(s"SELECT country, count(salary) AS amount FROM t3 WHERE country IN ('china','france') " +
+ s"GROUP BY country")
+ checkAnswer(dataFrame, Seq(Row("china", 849), Row("france", 101)))
+ }
+
+ test("test v1 to v3 compatabilty filter on measure with int measure") {
+ val dataFrame = localspark
+ .sql(s"SELECT sum(salary) FROM t3 where salary > 15408")
+ checkAnswer(dataFrame, Seq(Row(9281064)))
+ }
+
+ test("test v1 to v3 compatabilty filter on measure with decimal dimension") {
+ val dataFrame = localspark
+ .sql(s"SELECT sum(salary2) FROM t3 where salary2 > 15408")
+ checkAnswer(dataFrame, Seq(Row(9281064)))
+ }
+
+ override def afterAll {
+ localspark.stop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
index d4efedb..049a460 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
@@ -70,15 +70,15 @@ class SDVSuites extends Suites with BeforeAndAfterAll {
*/
class SDVSuites1 extends Suites with BeforeAndAfterAll {
- val suites = new BadRecordTestCase ::
- new BatchSortLoad1TestCase ::
- new BatchSortQueryTestCase ::
- new DataLoadingTestCase ::
- new OffheapSort2TestCase ::
- new PartitionTestCase ::
- new QueriesBasicTestCase ::
- new BatchSortLoad3TestCase ::
- new GlobalSortTestCase :: Nil
+ val suites = new BadRecordTestCase ::
+ new BatchSortLoad1TestCase ::
+ new BatchSortQueryTestCase ::
+ new DataLoadingTestCase ::
+ new OffheapSort2TestCase ::
+ new PartitionTestCase ::
+ new QueriesBasicTestCase ::
+ new BatchSortLoad3TestCase ::
+ new GlobalSortTestCase :: Nil
override val nestedSuites = suites.toIndexedSeq
@@ -142,3 +142,19 @@ class SDVSuites3 extends Suites with BeforeAndAfterAll {
println("---------------- Stopped spark -----------------")
}
}
+
+/**
+ * Suite class for compatabiity tests
+ */
+class SDVSuites4 extends Suites with BeforeAndAfterAll {
+
+ val suites = new CarbonV1toV3CompatabilityTestCase :: Nil
+
+ override val nestedSuites = suites.toIndexedSeq
+
+ override protected def afterAll() = {
+ println("---------------- Stopping spark -----------------")
+ TestQueryExecutor.INSTANCE.stop()
+ println("---------------- Stopped spark -----------------")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
index b99884d..d668ccb 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
@@ -16,8 +16,9 @@
*/
package org.apache.spark.sql.test
-import java.io.{BufferedReader, File, FileReader}
+import java.io._
import java.net.URL
+import java.util.zip.ZipFile
import scala.collection.mutable.ArrayBuffer
@@ -57,15 +58,30 @@ object ResourceRegisterAndCopier {
resources.foreach { file =>
val hdfsDataPath = hdfsPath + "/" + file
val rsFile = FileFactory.getCarbonFile(hdfsDataPath, fileType)
+ val target = resourcePath + "/" + file
if (!rsFile.exists()) {
- val target = resourcePath + "/" + file
if (file.lastIndexOf("/") > -1) {
new File(resourcePath + "/" + file.substring(0, file.lastIndexOf("/"))).mkdirs()
}
downloadFile(link, file, target)
// copy it
copyLocalFile(hdfsDataPath, target)
+ // Unzip the zip file to local directory
+ if (target.endsWith("zip")) {
+ unzip(target, new File(resourcePath + "/" + file.substring(0, file.lastIndexOf("/")))
+ .getAbsolutePath)
+ }
new File(target).delete()
+ } else if (target.endsWith("zip")) {
+ if (new File(target).exists()) {
+ FileFactory.deleteAllFilesOfDir(new File(target))
+ }
+ if (file.lastIndexOf("/") > -1) {
+ new File(resourcePath + "/" + file.substring(0, file.lastIndexOf("/"))).mkdirs()
+ }
+ downloadFile(link, file, target)
+ unzip(target, new File(resourcePath + "/" + file.substring(0, file.lastIndexOf("/")))
+ .getAbsolutePath)
}
}
}
@@ -143,4 +159,41 @@ object ResourceRegisterAndCopier {
input.close()
}
+ private def unzip(zipFilePath: String, destDir: String) = {
+ LOGGER.info(s"Uncompressing $zipFilePath to the directory $destDir")
+ try {
+ val zipFile = new ZipFile(zipFilePath)
+ val enu = zipFile.entries
+ while ( { enu.hasMoreElements }) {
+ val zipEntry = enu.nextElement
+ val name = destDir + "/" + zipEntry.getName
+ val size = zipEntry.getSize
+ val compressedSize = zipEntry.getCompressedSize
+ val file = new File(name)
+ if (name.endsWith("/")) {
+ file.mkdirs
+ } else {
+ val parent = file.getParentFile
+ if (parent != null) {
+ parent.mkdirs
+ }
+ val is = zipFile.getInputStream(zipEntry)
+ val fos = new FileOutputStream(file)
+ val bytes = new Array[Byte](1024)
+ var length = is.read(bytes)
+ while (length >= 0) {
+ fos.write(bytes, 0, length)
+ length = is.read(bytes)
+ }
+ is.close
+ fos.close()
+ }
+ }
+ zipFile.close
+ } catch {
+ case e: IOException =>
+ e.printStackTrace()
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/133b3039/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 2c2b1cf..eeca8b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -43,7 +43,7 @@ class CarbonSession(@transient val sc: SparkContext,
}
@transient
- override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
+ override lazy val sessionState: SessionState = new CarbonSessionState(this)
/**
* State shared across sessions, including the `SparkContext`, cached data, listener,