You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/11/05 06:14:34 UTC
carbondata git commit: [CARBONDATA-3057] Implement VectorizedReader
for SDK Reader
Repository: carbondata
Updated Branches:
refs/heads/master 5a0bc6e71 -> 63a28a951
[CARBONDATA-3057] Implement VectorizedReader for SDK Reader
1. Added carbondata file listing for getting splits to avoid block/blocklet datamap
loading when filter expressions is not provided by the user
2. Implemented Vectorized reader, exposes a property to switch between record reader/vector reader.
This closes #2869
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/63a28a95
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/63a28a95
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/63a28a95
Branch: refs/heads/master
Commit: 63a28a951ed552680da1a5047f5937fb90a8d76d
Parents: 5a0bc6e
Author: kunal642 <ku...@gmail.com>
Authored: Fri Oct 26 11:43:22 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Mon Nov 5 11:49:24 2018 +0530
----------------------------------------------------------------------
...feVariableLengthDimensionDataChunkStore.java | 5 +-
.../filesystem/AbstractDFSCarbonFile.java | 26 +++
.../core/datastore/filesystem/CarbonFile.java | 8 +
.../datastore/filesystem/LocalCarbonFile.java | 23 ++
.../encoding/compress/DirectCompressCodec.java | 7 +-
.../core/metadata/datatype/DecimalType.java | 2 +-
.../core/metadata/datatype/StructType.java | 2 +-
.../vector/impl/CarbonColumnVectorImpl.java | 18 +-
docs/sdk-guide.md | 8 +
.../carbondata/hadoop/CarbonRecordReader.java | 15 ++
.../hadoop/api/CarbonFileInputFormat.java | 55 ++++-
.../util/CarbonVectorizedRecordReader.java | 211 +++++++++++++++++++
.../sdk/file/CarbonReaderBuilder.java | 36 +++-
.../sdk/file/CSVCarbonWriterTest.java | 4 +-
.../carbondata/sdk/file/CarbonReaderTest.java | 140 ++++++++++--
15 files changed, 519 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 2873eed..01db383 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -163,13 +163,14 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
}
DataType dt = vector.getType();
- if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
+ if (((!(dt == DataTypes.STRING) && !(dt == DataTypes.VARCHAR)) && length == 0)
+ || ByteUtil.UnsafeComparer.INSTANCE
.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentDataOffset,
length)) {
vector.putNull(vectorRow);
} else {
- if (dt == DataTypes.STRING) {
+ if (dt == DataTypes.STRING || dt == DataTypes.VARCHAR) {
vector.putByteArray(vectorRow, currentDataOffset, length, data);
} else if (dt == DataTypes.BOOLEAN) {
vector.putBoolean(vectorRow, ByteUtil.toBoolean(data[currentDataOffset]));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 24efb70..d56caac 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -524,6 +524,27 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
return getFiles(listStatus);
}
+ /**
+ * Method used to list files recursively and apply file filter on the result.
+ *
+ */
+ @Override
+ public List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter)
+ throws IOException {
+ List<CarbonFile> carbonFiles = new ArrayList<>();
+ if (null != fileStatus && fileStatus.isDirectory()) {
+ RemoteIterator<LocatedFileStatus> listStatus = fs.listFiles(fileStatus.getPath(), recursive);
+ while (listStatus.hasNext()) {
+ LocatedFileStatus locatedFileStatus = listStatus.next();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(locatedFileStatus.getPath().toString());
+ if (fileFilter.accept(carbonFile)) {
+ carbonFiles.add(carbonFile);
+ }
+ }
+ }
+ return carbonFiles;
+ }
+
@Override
public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
if (null != fileStatus && fileStatus.isDirectory()) {
@@ -584,4 +605,9 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
return fs.getDefaultReplication(path);
}
+
+ @Override
+ public long getLength() {
+ return fileStatus.getLen();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 8044817..ce50259 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -38,6 +38,8 @@ public interface CarbonFile {
List<CarbonFile> listFiles(Boolean recurssive) throws IOException;
+ List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter) throws IOException;
+
/**
* It returns list of files with location details.
* @return
@@ -179,4 +181,10 @@ public interface CarbonFile {
* @throws IOException if error occurs
*/
short getDefaultReplication(String filePath) throws IOException;
+
+ /**
+ * Get the length of this file, in bytes.
+ * @return the length of this file, in bytes.
+ */
+ long getLength();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 620a19c..98d61f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -183,6 +183,25 @@ public class LocalCarbonFile implements CarbonFile {
return carbonFiles;
}
+ @Override public List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter)
+ throws IOException {
+ if (!file.isDirectory()) {
+ return new ArrayList<CarbonFile>();
+ }
+ Collection<File> fileCollection = FileUtils.listFiles(file, null, recursive);
+ if (fileCollection == null) {
+ return new ArrayList<CarbonFile>();
+ }
+ List<CarbonFile> carbonFiles = new ArrayList<CarbonFile>();
+ for (File file : fileCollection) {
+ CarbonFile carbonFile = new LocalCarbonFile(file);
+ if (fileFilter.accept(carbonFile)) {
+ carbonFiles.add(carbonFile);
+ }
+ }
+ return carbonFiles;
+ }
+
@Override public boolean createNewFile() {
try {
return file.createNewFile();
@@ -479,4 +498,8 @@ public class LocalCarbonFile implements CarbonFile {
public short getDefaultReplication(String filePath) throws IOException {
return 1;
}
+
+ @Override public long getLength() {
+ return file.length();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 1825850..fd94344 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -329,9 +329,6 @@ public class DirectCompressCodec implements ColumnPageCodec {
} else if (pageDataType == DataTypes.LONG) {
long[] longData = columnPage.getLongPage();
if (vectorDataType == DataTypes.LONG) {
- for (int i = 0; i < pageSize; i++) {
- vector.putLong(i, longData[i]);
- }
vector.putLongs(0, pageSize, longData, 0);
} else if (vectorDataType == DataTypes.TIMESTAMP) {
for (int i = 0; i < pageSize; i++) {
@@ -347,9 +344,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
columnPage.getNullBits());
} else if (vectorDataType == DataTypes.FLOAT) {
float[] floatPage = columnPage.getFloatPage();
- for (int i = 0; i < pageSize; i++) {
- vector.putFloats(0, pageSize, floatPage, 0);
- }
+ vector.putFloats(0, pageSize, floatPage, 0);
} else {
double[] doubleData = columnPage.getDoublePage();
vector.putDoubles(0, pageSize, doubleData, 0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
index a7f7a4e..b4bc20c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
@@ -23,7 +23,7 @@ public class DecimalType extends DataType {
private int scale;
// create a decimal type object with specified precision and scale
- DecimalType(int precision, int scale) {
+ public DecimalType(int precision, int scale) {
super(DataTypes.DECIMAL_TYPE_ID, 8, "DECIMAL", -1);
this.precision = precision;
this.scale = scale;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
index e8559b2..4725fb0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
@@ -23,7 +23,7 @@ public class StructType extends DataType {
private List<StructField> fields;
- StructType(List<StructField> fields) {
+ public StructType(List<StructField> fields) {
super(DataTypes.STRUCT_TYPE_ID, 10, "STRUCT", -1);
this.fields = fields;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index a11682b..f89ad9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -70,7 +70,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
byteArr = new byte[batchSize];
} else if (dataType == DataTypes.SHORT) {
shorts = new short[batchSize];
- } else if (dataType == DataTypes.INT) {
+ } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
ints = new int[batchSize];
} else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
longs = new long[batchSize];
@@ -80,7 +80,8 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
doubles = new double[batchSize];
} else if (dataType instanceof DecimalType) {
decimals = new BigDecimal[batchSize];
- } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+ } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY
+ || dataType == DataTypes.VARCHAR) {
dictionaryVector = new CarbonColumnVectorImpl(batchSize, DataTypes.INT);
bytes = new byte[batchSize][];
} else {
@@ -207,7 +208,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
return byteArr[rowId];
} else if (dataType == DataTypes.SHORT) {
return shorts[rowId];
- } else if (dataType == DataTypes.INT) {
+ } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
return ints[rowId];
} else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
return longs[rowId];
@@ -217,7 +218,8 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
return doubles[rowId];
} else if (dataType instanceof DecimalType) {
return decimals[rowId];
- } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+ } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY
+ || dataType == DataTypes.VARCHAR) {
if (null != carbonDictionary) {
int dictKey = (Integer) dictionaryVector.getData(rowId);
return carbonDictionary.getDictionaryValue(dictKey);
@@ -243,7 +245,8 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
return doubles;
} else if (dataType instanceof DecimalType) {
return decimals;
- } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+ } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY || dataType ==
+ DataTypes.VARCHAR) {
if (null != carbonDictionary) {
return ints;
}
@@ -259,7 +262,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
Arrays.fill(byteArr, (byte) 0);
} else if (dataType == DataTypes.SHORT) {
Arrays.fill(shorts, (short) 0);
- } else if (dataType == DataTypes.INT) {
+ } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
Arrays.fill(ints, 0);
} else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
Arrays.fill(longs, 0);
@@ -269,7 +272,8 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
Arrays.fill(doubles, 0);
} else if (dataType instanceof DecimalType) {
Arrays.fill(decimals, null);
- } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+ } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY
+ || dataType == DataTypes.VARCHAR) {
Arrays.fill(bytes, null);
this.dictionaryVector.reset();
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index c286d81..8988dc3 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -456,6 +456,14 @@ public CarbonWriterBuilder writtenBy(String appName) {
public CarbonWriter build() throws IOException, InvalidLoadOptionException;
```
+```
+ /**
+ * Configure Row Record Reader for reading.
+ *
+ */
+ public CarbonReaderBuilder withRowRecordReader()
+```
+
### Class org.apache.carbondata.sdk.file.CarbonWriter
```
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index d447320..3ff5093 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.hadoop;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -24,7 +25,9 @@ import java.util.Map;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
@@ -78,6 +81,18 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
List<CarbonInputSplit> splitList;
if (inputSplit instanceof CarbonInputSplit) {
splitList = new ArrayList<>(1);
+ String splitPath = ((CarbonInputSplit) inputSplit).getPath().toString();
+ // BlockFooterOffSet will be null in case of CarbonVectorizedReader as this has to be set
+ // where multiple threads are able to read small set of files to calculate footer instead
+ // of the main thread setting this for all the files.
+ if (((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
+ FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
+ context.getConfiguration());
+ ByteBuffer buffer = reader
+ .readByteBuffer(FileFactory.getUpdatedFilePath(splitPath), inputSplit.getLength() - 8,
+ 8);
+ ((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+ }
splitList.add((CarbonInputSplit) inputSplit);
} else if (inputSplit instanceof CarbonMultiBlockSplit) {
// contains multiple blocks, this is an optimization for concurrent query.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index fcfb346..8b43190 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -21,14 +21,20 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -36,11 +42,13 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@@ -59,6 +67,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
+
public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
CarbonTable carbonTableTemp;
if (carbonTable == null) {
@@ -145,9 +154,35 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
externalTableSegments.add(seg);
}
}
- // do block filtering and get split
- List<InputSplit> splits =
- getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
+ List<InputSplit> splits = new ArrayList<>();
+ boolean useBlockDataMap = job.getConfiguration().getBoolean("filter_blocks", true);
+ // useBlockDataMap would be false in case of SDK when user has not provided any filter, In
+ // this case we dont want to load block/blocklet datamap. It would be true in all other
+ // scenarios
+ if (useBlockDataMap) {
+ // do block filtering and get split
+ splits = getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
+ } else {
+ for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) {
+ // Segment id is set to null because SDK does not write carbondata files with respect
+ // to segments. So no specific name is present for this load.
+ CarbonInputSplit split =
+ new CarbonInputSplit("null", new Path(carbonFile.getAbsolutePath()), 0,
+ carbonFile.getLength(), carbonFile.getLocations(), FileFormat.COLUMNAR_V3);
+ split.setVersion(ColumnarFormatVersion.V3);
+ BlockletDetailInfo info = new BlockletDetailInfo();
+ split.setDetailInfo(info);
+ info.setBlockSize(carbonFile.getLength());
+ info.setVersionNumber(split.getVersion().number());
+ info.setUseMinMaxForPruning(false);
+ splits.add(split);
+ }
+ Collections.sort(splits, new Comparator<InputSplit>() {
+ @Override public int compare(InputSplit o1, InputSplit o2) {
+ return ((CarbonInputSplit) o1).getPath().compareTo(((CarbonInputSplit) o2).getPath());
+ }
+ });
+ }
if (getColumnProjection(job.getConfiguration()) == null) {
// If the user projection is empty, use default all columns as projections.
// All column name will be filled inside getSplits, so can update only here.
@@ -159,6 +194,20 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
return null;
}
+ private List<CarbonFile> getAllCarbonDataFiles(String tablePath) {
+ List<CarbonFile> carbonFiles;
+ try {
+ carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles(true, new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().contains(CarbonTablePath.CARBON_DATA_EXT);
+ }
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return carbonFiles;
+ }
+
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
new file mode 100644
index 0000000..9d3d7d6
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * A specialized RecordReader that reads into CarbonColumnarBatches directly using the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
+
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
+
+ private CarbonColumnarBatch carbonColumnarBatch;
+
+ private QueryExecutor queryExecutor;
+
+ private int batchIdx = 0;
+
+ private int numBatched = 0;
+
+ private AbstractDetailQueryResultIterator iterator;
+
+ private QueryModel queryModel;
+
+ public CarbonVectorizedRecordReader(QueryModel queryModel) {
+ this.queryModel = queryModel;
+ }
+
+ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ List<CarbonInputSplit> splitList;
+ if (inputSplit instanceof CarbonInputSplit) {
+ // Read the footer offset and set.
+ String splitPath = ((CarbonInputSplit) inputSplit).getPath().toString();
+ if (((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
+ FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
+ taskAttemptContext.getConfiguration());
+ ByteBuffer buffer = reader
+ .readByteBuffer(FileFactory.getUpdatedFilePath(splitPath),
+ ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockSize() - 8,
+ 8);
+ ((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+ }
+ splitList = new ArrayList<>(1);
+ splitList.add((CarbonInputSplit) inputSplit);
+ } else {
+ throw new RuntimeException("unsupported input split type: " + inputSplit);
+ }
+ List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+ queryModel.setVectorReader(true);
+ try {
+ queryExecutor =
+ QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
+ iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
+ initBatch();
+ } catch (QueryExecutionException e) {
+ LOGGER.error(e);
+ throw new InterruptedException(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw e;
+ }
+ }
+
+ @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (batchIdx >= numBatched) {
+ if (!nextBatch()) return false;
+ }
+ ++batchIdx;
+ return true;
+ }
+
+
+ private boolean nextBatch() {
+ carbonColumnarBatch.reset();
+ if (iterator.hasNext()) {
+ iterator.processNextBatch(carbonColumnarBatch);
+ numBatched = carbonColumnarBatch.getActualSize();
+ batchIdx = 0;
+ return true;
+ }
+ return false;
+ }
+
+ private void initBatch() {
+ if (carbonColumnarBatch == null) {
+ List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
+ List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
+ StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
+ for (ProjectionDimension dim : queryDimension) {
+ fields[dim.getOrdinal()] =
+ new StructField(dim.getColumnName(), dim.getDimension().getDataType());
+ }
+ for (ProjectionMeasure msr : queryMeasures) {
+ DataType dataType = msr.getMeasure().getDataType();
+ if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT
+ || dataType == DataTypes.INT || dataType == DataTypes.LONG
+ || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) {
+ fields[msr.getOrdinal()] =
+ new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
+ } else if (DataTypes.isDecimal(dataType)) {
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
+ new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()));
+ } else {
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
+ }
+ }
+ CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ vectors[i] = new CarbonColumnVectorImpl(
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
+ fields[i].getDataType());
+ }
+ carbonColumnarBatch = new CarbonColumnarBatch(vectors,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
+ new boolean[] {});
+ }
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException, InterruptedException {
+ rowCount += 1;
+ Object[] row = new Object[carbonColumnarBatch.columnVectors.length];
+ for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) {
+ if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING
+ || carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) {
+ byte[] data = (byte[]) carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
+ row[i] = ByteUtil.toString(data, 0, data.length);
+ } else if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.BOOLEAN) {
+ byte data = (byte) carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
+ row[i] = ByteUtil.toBoolean(data);
+ } else {
+ row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
+ }
+ }
+ return row;
+ }
+
+ @Override public Void getCurrentKey() throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Operation not allowed on CarbonVectorizedReader");
+ }
+
+ @Override public float getProgress() throws IOException, InterruptedException {
+ // TODO : Implement it based on total number of rows it is going to retrieve.
+ return 0;
+ }
+
+ @Override public void close() throws IOException {
+ logStatistics(rowCount, queryModel.getStatisticsRecorder());
+ if (carbonColumnarBatch != null) {
+ carbonColumnarBatch = null;
+ }
+ if (iterator != null) {
+ iterator.close();
+ }
+ try {
+ queryExecutor.finish();
+ } catch (QueryExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index cc3fd72..b04f0c5 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -28,10 +28,13 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.CarbonSessionInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -51,6 +54,7 @@ public class CarbonReaderBuilder {
private Expression filterExpression;
private String tableName;
private Configuration hadoopConf;
+ private boolean useVectorReader = true;
/**
* Construct a CarbonReaderBuilder with table path and table name
@@ -119,6 +123,15 @@ public class CarbonReaderBuilder {
}
/**
+ * Configure Row Record Reader for reading.
+ *
+ */
+ public CarbonReaderBuilder withRowRecordReader() {
+ this.useVectorReader = false;
+ return this;
+ }
+
+ /**
* Build CarbonReader
*
* @param <T>
@@ -158,14 +171,31 @@ public class CarbonReaderBuilder {
}
try {
- final List<InputSplit> splits =
- format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
+ if (filterExpression == null) {
+ job.getConfiguration().set("filter_blocks", "false");
+ }
+ List<InputSplit> splits =
+ format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
for (InputSplit split : splits) {
TaskAttemptContextImpl attempt =
new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
- RecordReader reader = format.createRecordReader(split, attempt);
+ RecordReader reader;
+ QueryModel queryModel = format.createQueryModel(split, attempt);
+ boolean hasComplex = false;
+ for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) {
+ if (projectionDimension.getDimension().isComplex()) {
+ hasComplex = true;
+ break;
+ }
+ }
+ if (useVectorReader && !hasComplex) {
+ queryModel.setDirectVectorFill(filterExpression == null);
+ reader = new CarbonVectorizedRecordReader(queryModel);
+ } else {
+ reader = format.createRecordReader(split, attempt);
+ }
try {
reader.initialize(split, attempt);
readers.add(reader);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index ed697d7..26078ed 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -454,7 +454,8 @@ public class CSVCarbonWriterTest {
writer.close();
CarbonReader carbonReader =
new CarbonReaderBuilder(path, "table1").build();
- for (int i = 0; i < 15; i++) {
+ int i = 0;
+ while(carbonReader.hasNext()) {
Object[] actualRow = (Object[]) carbonReader.readNextRow();
String[] expectedRow = new String[] { "robot" + (i % 10), "" + i, i + "." + i };
for (int j = 0; j < 3; j++) {
@@ -462,6 +463,7 @@ public class CSVCarbonWriterTest {
}
assert(actualRow[1] instanceof Byte);
assert(actualRow[2] instanceof Float);
+ i++;
}
carbonReader.close();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 74428f0..9fb34d4 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -28,7 +28,9 @@ import org.apache.log4j.Logger;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.common.logging.impl.Audit;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
@@ -63,7 +65,8 @@ public class CarbonReaderTest extends TestCase {
public void testWriteAndReadFiles() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -116,7 +119,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
String path1 = path + "/0testdir";
String path2 = path + "/testdir";
@@ -162,7 +166,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadWithFilterOfNonTransactional2() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -198,7 +203,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadWithFilterOfNonTransactionalAnd() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -240,7 +246,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadWithFilterOfNonTransactionalOr() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -282,7 +289,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadWithFilterOfNonTransactionalGreaterThan() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -324,7 +332,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadWithFilterOfNonTransactionalLessThan() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -366,7 +375,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadWithFilterOfNonTransactionalNotEqual() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -415,7 +425,8 @@ public class CarbonReaderTest extends TestCase {
fields[2] = new Field("doubleField", DataTypes.DOUBLE);
TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
new LiteralExpression("13.5", DataTypes.DOUBLE));
@@ -450,7 +461,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadWithFilterOfNonTransactionalNotIn() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -494,7 +506,10 @@ public class CarbonReaderTest extends TestCase {
String path2 = "./testWriteFiles2";
FileUtils.deleteDirectory(new File(path1));
FileUtils.deleteDirectory(new File(path2));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path1));
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path2));
Field[] fields = new Field[] { new Field("c1", "string"),
new Field("c2", "int") };
Schema schema = new Schema(fields);
@@ -551,7 +566,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadColumnTwice() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -595,7 +611,8 @@ public class CarbonReaderTest extends TestCase {
public void readFilesParallel() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -630,7 +647,8 @@ public class CarbonReaderTest extends TestCase {
public void testReadAfterClose() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -670,7 +688,8 @@ public class CarbonReaderTest extends TestCase {
public void testWriteAndReadFilesWithoutTableName() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -708,7 +727,8 @@ public class CarbonReaderTest extends TestCase {
public void testWriteAndReadFilesWithoutTableName2() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
-
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(path));
Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
@@ -902,7 +922,7 @@ public class CarbonReaderTest extends TestCase {
Assert.assertTrue(dataFiles.length > 0);
CarbonReader reader = CarbonReader.builder(path, "_temp")
-
+
.projection(new String[]{
"stringField"
, "shortField"
@@ -1701,6 +1721,7 @@ public class CarbonReaderTest extends TestCase {
// Read data
CarbonReader reader = CarbonReader
.builder(path, "_temp")
+ .withRowRecordReader()
.build();
int i = 0;
@@ -1724,6 +1745,91 @@ public class CarbonReaderTest extends TestCase {
assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
i++;
}
+ assert (i == 10);
+ reader.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testVectorReader() {
+ String path = "./testWriteFiles";
+ try {
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[12];
+ fields[0] = new Field("stringField", DataTypes.STRING);
+ fields[1] = new Field("shortField", DataTypes.SHORT);
+ fields[2] = new Field("intField", DataTypes.INT);
+ fields[3] = new Field("longField", DataTypes.LONG);
+ fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+ fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+ fields[6] = new Field("dateField", DataTypes.DATE);
+ fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+ fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+ fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+ fields[10] = new Field("byteField", DataTypes.BYTE);
+ fields[11] = new Field("floatField", DataTypes.FLOAT);
+ Map<String, String> map = new HashMap<>();
+ map.put("complex_delimiter_level_1", "#");
+ CarbonWriter writer = CarbonWriter.builder()
+ .outputPath(path)
+ .withLoadOptions(map)
+ .withCsvInput(new Schema(fields))
+ .writtenBy("CarbonReaderTest")
+ .build();
+
+ for (int i = 0; i < 10; i++) {
+ String[] row2 = new String[]{
+ "robot" + (i % 10),
+ String.valueOf(i % 10000),
+ String.valueOf(i),
+ String.valueOf(Long.MAX_VALUE - i),
+ String.valueOf((double) i / 2),
+ String.valueOf(true),
+ "2019-03-02",
+ "2019-02-12 03:03:34",
+ "12.345",
+ "varchar",
+ String.valueOf(i),
+ "1.23"
+ };
+ writer.write(row2);
+ }
+ writer.close();
+
+ // Read data
+ CarbonReader reader = CarbonReader
+ .builder(path, "_temp")
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] data = (Object[]) reader.readNextRow();
+
+ assert (RowUtil.getString(data, 0).equals("robot" + i));
+ assertEquals(RowUtil.getShort(data, 4), i);
+ assertEquals(RowUtil.getInt(data, 5), i);
+ assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i);
+ assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2);
+ assert (RowUtil.getBoolean(data, 8));
+ assertEquals(RowUtil.getInt(data, 1), 17957);
+ assert (RowUtil.getDecimal(data, 9).equals("12.35"));
+ assert (RowUtil.getString(data, 3).equals("varchar"));
+ assertEquals(RowUtil.getByte(data, 10), new Byte(String.valueOf(i)));
+ assertEquals(RowUtil.getFloat(data, 11), new Float("1.23"));
+ i++;
+ }
+ assert(i==10);
reader.close();
} catch (Throwable e) {
e.printStackTrace();