You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/03/29 09:07:06 UTC
[1/4] incubator-carbondata git commit: [CARBONDATA-826] Create
carbondata-connector for query carbon data in presto
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 08a6e8161 -> bd5114097
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 45c905a..1d0eeed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -83,618 +83,614 @@ import java.util.stream.Stream;
import static java.util.Objects.requireNonNull;
public class CarbonTableReader {
- //CarbonTableReader will be a facade of these utils
- //[
- // 1:CarbonMetadata,(logic table)
- // 2:FileFactory, (physic table file)
- // 3:CarbonCommonFactory, (offer some )
- // 4:DictionaryFactory, (parse dictionary util)
- //]
-
- private CarbonTableConfig config;
- private List<SchemaTableName> tableList;
- private CarbonFile dbStore;
- private FileFactory.FileType fileType;
-
- private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;//as a cache for Carbon reader
-
- @Inject
- public CarbonTableReader(CarbonTableConfig config){
- this.config = requireNonNull(config, "CarbonTableConfig is null");
- this.cc = new ConcurrentHashMap<>();
- }
-
- public CarbonTableCacheModel getCarbonCache(SchemaTableName table){
- if(!cc.containsKey(table))//for worker node to initalize carbon metastore
- {
- try(ThreadContextClassLoader ignored = new ThreadContextClassLoader(FileFactory.class.getClassLoader())) {
- if(dbStore == null) {
- fileType = FileFactory.getFileType(config.getStorePath());
- try{
- dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
- }catch (Exception ex){
- throw new RuntimeException(ex);
- }
- }
- }
- updateSchemaTables();
- parseCarbonMetadata(table);
+ //CarbonTableReader will be a facade of these utils
+ //[
+ // 1:CarbonMetadata,(logic table)
+ // 2:FileFactory, (physic table file)
+ // 3:CarbonCommonFactory, (offer some )
+ // 4:DictionaryFactory, (parse dictionary util)
+ //]
+
+ private CarbonTableConfig config;
+ private List<SchemaTableName> tableList;
+ private CarbonFile dbStore;
+ private FileFactory.FileType fileType;
+
+ private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
+ //as a cache for Carbon reader
+
+ @Inject public CarbonTableReader(CarbonTableConfig config) {
+ this.config = requireNonNull(config, "CarbonTableConfig is null");
+ this.cc = new ConcurrentHashMap<>();
+ }
+
+ public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
+ if (!cc.containsKey(table))//for worker node to initalize carbon metastore
+ {
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
+ FileFactory.class.getClassLoader())) {
+ if (dbStore == null) {
+ fileType = FileFactory.getFileType(config.getStorePath());
+ try {
+ dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
-
- if(cc.containsKey(table))
- return cc.get(table);
- else
- return null;//need to reload?*/
+ }
+ updateSchemaTables();
+ parseCarbonMetadata(table);
}
- public List<String> getSchemaNames() {
- return updateSchemaList();
- }
-
- //default PathFilter
- private static final PathFilter DefaultFilter = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return CarbonTablePath.isCarbonDataFile(path.getName());
- }
- };
-
- public boolean updateDbStore(){
- if(dbStore == null) {
- fileType = FileFactory.getFileType(config.getStorePath());
- try{
- dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
- }catch (Exception ex){
- throw new RuntimeException(ex);
- }
- }
- return true;
- }
+ if (cc.containsKey(table)) return cc.get(table);
+ else return null;//need to reload?*/
+ }
- public List<String> updateSchemaList() {
- updateDbStore();
+ public List<String> getSchemaNames() {
+ return updateSchemaList();
+ }
- if(dbStore != null){
- List<String> scs = Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
- return scs;
- }
- else
- return ImmutableList.of();
+ //default PathFilter
+ private static final PathFilter DefaultFilter = new PathFilter() {
+ @Override public boolean accept(Path path) {
+ return CarbonTablePath.isCarbonDataFile(path.getName());
}
-
-
- public Set<String> getTableNames(String schema) {
- requireNonNull(schema, "schema is null");
- return updateTableList(schema);
+ };
+
+ public boolean updateDbStore() {
+ if (dbStore == null) {
+ fileType = FileFactory.getFileType(config.getStorePath());
+ try {
+ dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
-
- public Set<String> updateTableList(String dbName){
- List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName())).collect(Collectors.toList());
- if(schema.size() > 0)
- {
- return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()).collect(Collectors.toSet());
- }
- else
- return ImmutableSet.of();
+ return true;
+ }
+
+ public List<String> updateSchemaList() {
+ updateDbStore();
+
+ if (dbStore != null) {
+ List<String> scs =
+ Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+ return scs;
+ } else return ImmutableList.of();
+ }
+
+ public Set<String> getTableNames(String schema) {
+ requireNonNull(schema, "schema is null");
+ return updateTableList(schema);
+ }
+
+ public Set<String> updateTableList(String dbName) {
+ List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+ .collect(Collectors.toList());
+ if (schema.size() > 0) {
+ return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
+ .collect(Collectors.toSet());
+ } else return ImmutableSet.of();
+ }
+
+ public CarbonTable getTable(SchemaTableName schemaTableName) {
+ try {
+ updateSchemaTables();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- public CarbonTable getTable(SchemaTableName schemaTableName) {
- try {
- updateSchemaTables();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ requireNonNull(schemaTableName, "schemaTableName is null");
+ CarbonTable table = loadTableMetadata(schemaTableName);
- requireNonNull(schemaTableName, "schemaTableName is null");
- CarbonTable table = loadTableMetadata(schemaTableName);
+ return table;
+ }
- return table;
+ public void updateSchemaTables() {
+ //update logic determine later
+ if (dbStore == null) {
+ updateSchemaList();
}
-
- public void updateSchemaTables()
- {
- //update logic determine later
- if(dbStore == null)
- {
- updateSchemaList();
- }
-
- tableList = new LinkedList<>();
- for(CarbonFile db: dbStore.listFiles())
- {
- if(!db.getName().endsWith(".mdt")) {
- for (CarbonFile table : db.listFiles()) {
- tableList.add(new SchemaTableName(db.getName(), table.getName()));
- }
- }
+ tableList = new LinkedList<>();
+ for (CarbonFile db : dbStore.listFiles()) {
+ if (!db.getName().endsWith(".mdt")) {
+ for (CarbonFile table : db.listFiles()) {
+ tableList.add(new SchemaTableName(db.getName(), table.getName()));
}
+ }
}
+ }
- private CarbonTable loadTableMetadata(SchemaTableName schemaTableName)
- {
- for (SchemaTableName table : tableList) {
- if (!table.equals(schemaTableName))
- continue;
+ private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) {
+ for (SchemaTableName table : tableList) {
+ if (!table.equals(schemaTableName)) continue;
- return parseCarbonMetadata(table);
- }
- return null;
+ return parseCarbonMetadata(table);
}
-
- /**
- * parse carbon metadata into cc(CarbonTableReader cache)
- **/
- public CarbonTable parseCarbonMetadata(SchemaTableName table)
- {
- CarbonTable result = null;
- try {
- //\u8fd9\u4e2a\u5e94\u8be5\u653e\u5728StoreFactory
- CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
- if(cache.isValid())
- return cache.carbonTable;
-
- //Step1: get table meta path, load carbon table param
- String storePath = config.getStorePath();
- cache.carbonTableIdentifier = new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString());
- cache.carbonTablePath = PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
- cc.put(table, cache);
-
- //Step2: check file existed? read schema file
- ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
- public TBase create() {
- return new org.apache.carbondata.format.TableInfo();
- }
- };
- ThriftReader thriftReader =
- new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
- thriftReader.open();
- org.apache.carbondata.format.TableInfo tableInfo =
- (org.apache.carbondata.format.TableInfo) thriftReader.read();
- thriftReader.close();
-
- //Format Level\u7684TableInfo\uff0c \u9700\u8981\u8f6c\u6362\u6210Code Level\u7684TableInfo
- SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- TableInfo wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
- storePath);
- wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
- //\u52a0\u8f7d\u5230CarbonMetadata\u4ed3\u5e93
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-
- cache.tableInfo = wrapperTableInfo;
- cache.carbonTable = CarbonMetadata.getInstance().getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
- result = cache.carbonTable;
- }catch (Exception ex) {
- throw new RuntimeException(ex);
+ return null;
+ }
+
+ /**
+ * parse carbon metadata into cc(CarbonTableReader cache)
+ **/
+ public CarbonTable parseCarbonMetadata(SchemaTableName table) {
+ CarbonTable result = null;
+ try {
+ //\u8fd9\u4e2a\u5e94\u8be5\u653e\u5728StoreFactory
+ CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
+ if (cache.isValid()) return cache.carbonTable;
+
+ //Step1: get table meta path, load carbon table param
+ String storePath = config.getStorePath();
+ cache.carbonTableIdentifier =
+ new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
+ UUID.randomUUID().toString());
+ cache.carbonTablePath =
+ PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
+ cc.put(table, cache);
+
+ //Step2: check file existed? read schema file
+ ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+ public TBase create() {
+ return new org.apache.carbondata.format.TableInfo();
}
-
- return result;
+ };
+ ThriftReader thriftReader =
+ new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
+ thriftReader.open();
+ org.apache.carbondata.format.TableInfo tableInfo =
+ (org.apache.carbondata.format.TableInfo) thriftReader.read();
+ thriftReader.close();
+
+ //Format Level\u7684TableInfo\uff0c \u9700\u8981\u8f6c\u6362\u6210Code Level\u7684TableInfo
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+ storePath);
+ wrapperTableInfo.setMetaDataFilepath(
+ CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
+ //\u52a0\u8f7d\u5230CarbonMetadata\u4ed3\u5e93
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+
+ cache.tableInfo = wrapperTableInfo;
+ cache.carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+ result = cache.carbonTable;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
- public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, Expression filters) throws Exception {
+ return result;
+ }
- //\u5904\u7406filter, \u4e0b\u63a8filter\uff0c\u5c06\u5e94\u7528\u5728Segment\u7684\u7d22\u5f15\u4e0a
- FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
+ public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+ Expression filters) throws Exception {
- AbsoluteTableIdentifier absoluteTableIdentifier = tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
- CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
- List<String> invalidSegments = new ArrayList<>();
- List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+ //\u5904\u7406filter, \u4e0b\u63a8filter\uff0c\u5c06\u5e94\u7528\u5728Segment\u7684\u7d22\u5f15\u4e0a
+ FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
- // get all valid segments and set them into the configuration
- SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier);
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
- SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager.getValidAndInvalidSegments();
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
+ CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
+ List<String> invalidSegments = new ArrayList<>();
+ List<UpdateVO> invalidTimestampsList = new ArrayList<>();
- tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
- if (segments.getValidSegments().size() == 0) {
- return new ArrayList<>(0);
- }
+ // get all valid segments and set them into the configuration
+ SegmentUpdateStatusManager updateStatusManager =
+ new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+ segmentStatusManager.getValidAndInvalidSegments();
- // remove entry in the segment index if there are invalid segments
- invalidSegments.addAll(segments.getInvalidSegments());
- for (String invalidSegmentId : invalidSegments) {
- invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
- }
- if (invalidSegments.size() > 0) {
- List<TableSegmentUniqueIdentifier> invalidSegmentsIds = new ArrayList<>(invalidSegments.size());
- for(String segId: invalidSegments) {
- invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
- }
- cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
- }
-
- // get filter for segment
- CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
- FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
-
- List<CarbonLocalInputSplit> result = new ArrayList<>();
- //for each segment fetch blocks matching filter in Driver BTree
- for (String segmentNo : tableCacheModel.segments) {
- try{
- List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient, updateStatusManager);
- for (DataRefNode dataRefNode : dataRefNodes) {
- BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
- TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
-
- if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), updateStatusManager)) {
- continue;
- }
- result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
- tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
- Arrays.asList(tableBlockInfo.getLocations()), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
- tableBlockInfo.getVersion().number()));
- }
- }catch (Exception ex){
- throw new RuntimeException(ex);
- }
- }
- cacheClient.close();
- return result;
+ tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
+ if (segments.getValidSegments().size() == 0) {
+ return new ArrayList<>(0);
}
- /**
- * get data blocks of given segment
- */
- private List<DataRefNode> getDataBlocksOfSegment(FilterExpressionProcessor filterExpressionProcessor,
- AbsoluteTableIdentifier absoluteTableIdentifier,
- CarbonTablePath tablePath,
- FilterResolverIntf resolver,
- String segmentId,
- CacheClient cacheClient,
- SegmentUpdateStatusManager updateStatusManager) throws IOException {
- //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
- //QueryStatistic statistic = new QueryStatistic();
-
- //\u8bfb\u53d6Segment \u5185\u90e8\u7684Index
- Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
- getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
- updateStatusManager);
-
- List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
-
- if (null != segmentIndexMap) {
- // build result
- for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
- List<DataRefNode> filterredBlocks;
- // if no filter is given get all blocks from Btree Index
- if (null == resolver) {
- filterredBlocks = getDataBlocksOfIndex(abstractIndex);
- } else {
- //ignore filter
- //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-
- // apply filter and get matching blocks
- filterredBlocks = filterExpressionProcessor
- .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
- absoluteTableIdentifier);
- }
- resultFilterredBlocks.addAll(filterredBlocks);
- }
- }
- //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
- //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
- return resultFilterredBlocks;
+ // remove entry in the segment index if there are invalid segments
+ invalidSegments.addAll(segments.getInvalidSegments());
+ for (String invalidSegmentId : invalidSegments) {
+ invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+ }
+ if (invalidSegments.size() > 0) {
+ List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+ new ArrayList<>(invalidSegments.size());
+ for (String segId : invalidSegments) {
+ invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
+ }
+ cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
}
- private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
- UpdateVO updateDetails) {
- if (null != updateDetails.getLatestUpdateTimestamp()
- && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
- .getRefreshedTimeStamp()) {
- return true;
+ // get filter for segment
+ CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
+ FilterResolverIntf filterInterface = CarbonInputFormatUtil
+ .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
+
+ List<CarbonLocalInputSplit> result = new ArrayList<>();
+ //for each segment fetch blocks matching filter in Driver BTree
+ for (String segmentNo : tableCacheModel.segments) {
+ try {
+ List<DataRefNode> dataRefNodes =
+ getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,
+ tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient,
+ updateStatusManager);
+ for (DataRefNode dataRefNode : dataRefNodes) {
+ BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
+ TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
+
+ if (CarbonUtil.isInvalidTableBlock(tableBlockInfo,
+ updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()),
+ updateStatusManager)) {
+ continue;
+ }
+ result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
+ tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
+ Arrays.asList(tableBlockInfo.getLocations()),
+ tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
+ tableBlockInfo.getVersion().number()));
}
- return false;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
-
- private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
- AbsoluteTableIdentifier absoluteTableIdentifier,
- CarbonTablePath tablePath,
- String segmentId,
- CacheClient cacheClient,
- SegmentUpdateStatusManager updateStatusManager) throws IOException {
- Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
- SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
- boolean isSegmentUpdated = false;
- Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
- TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
- new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
- segmentTaskIndexWrapper =
- cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
- UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
- if (null != segmentTaskIndexWrapper) {
- segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
- if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
- taskKeys = segmentIndexMap.keySet();
- isSegmentUpdated = true;
- }
+ cacheClient.close();
+ return result;
+ }
+
+ /**
+ * get data blocks of given segment
+ */
+ private List<DataRefNode> getDataBlocksOfSegment(
+ FilterExpressionProcessor filterExpressionProcessor,
+ AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath,
+ FilterResolverIntf resolver, String segmentId, CacheClient cacheClient,
+ SegmentUpdateStatusManager updateStatusManager) throws IOException {
+ //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
+ //QueryStatistic statistic = new QueryStatistic();
+
+ //\u8bfb\u53d6Segment \u5185\u90e8\u7684Index
+ Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+ getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
+ updateStatusManager);
+
+ List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
+
+ if (null != segmentIndexMap) {
+ // build result
+ for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
+ List<DataRefNode> filterredBlocks;
+ // if no filter is given get all blocks from Btree Index
+ if (null == resolver) {
+ filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+ } else {
+ //ignore filter
+ //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+
+ // apply filter and get matching blocks
+ filterredBlocks = filterExpressionProcessor
+ .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
+ absoluteTableIdentifier);
}
+ resultFilterredBlocks.addAll(filterredBlocks);
+ }
+ }
+ //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+ //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
+ return resultFilterredBlocks;
+ }
+
+ private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
+ UpdateVO updateDetails) {
+ if (null != updateDetails.getLatestUpdateTimestamp()
+ && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
+ .getRefreshedTimeStamp()) {
+ return true;
+ }
+ return false;
+ }
+
+ private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
+ AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId,
+ CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
+ Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
+ SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+ boolean isSegmentUpdated = false;
+ Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
+ TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+ new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+ segmentTaskIndexWrapper =
+ cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
+ UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
+ if (null != segmentTaskIndexWrapper) {
+ segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+ if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
+ taskKeys = segmentIndexMap.keySet();
+ isSegmentUpdated = true;
+ }
+ }
- // if segment tree is not loaded, load the segment tree
- if (segmentIndexMap == null || isSegmentUpdated) {
-
- List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
- List<String> segs = new ArrayList<>();
- segs.add(segmentId);
+ // if segment tree is not loaded, load the segment tree
+ if (segmentIndexMap == null || isSegmentUpdated) {
- FileSystem fs = getFileStatusOfSegments(new String[]{segmentId}, tablePath, fileStatusList);
- List<InputSplit> splits = getSplit(fileStatusList, fs);
+ List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+ List<String> segs = new ArrayList<>();
+ segs.add(segmentId);
- List<FileSplit> carbonSplits = new ArrayList<>();
- for (InputSplit inputSplit : splits) {
- FileSplit fileSplit = (FileSplit) inputSplit;
- String segId = CarbonTablePath.DataPathUtil.getSegmentId(fileSplit.getPath().toString());//\u8fd9\u91cc\u7684seperator\u5e94\u8be5\u600e\u4e48\u52a0\uff1f\uff1f
- if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
- continue;
- }
- carbonSplits.add(fileSplit);
- }
+ FileSystem fs =
+ getFileStatusOfSegments(new String[] { segmentId }, tablePath, fileStatusList);
+ List<InputSplit> splits = getSplit(fileStatusList, fs);
- List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
- for (FileSplit inputSplit : carbonSplits) {
- if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId)) {
-
- BlockletInfos blockletInfos = new BlockletInfos(0, 0, 0);//this level we do not need blocklet info!!!! Is this a trick?
- tableBlockInfoList.add(
- new TableBlockInfo(inputSplit.getPath().toString(),
- inputSplit.getStart(),
- segmentId,
- inputSplit.getLocations(),
- inputSplit.getLength(),
- blockletInfos,
- ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION),
- null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//\u8fd9\u91cc\u7684null\u662f\u5426\u4f1a\u5f02\u5e38\uff1f
- }
- }
-
- Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
- segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
- // get Btree blocks for given segment
- tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
- tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
- segmentTaskIndexWrapper =
- cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
- segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+ List<FileSplit> carbonSplits = new ArrayList<>();
+ for (InputSplit inputSplit : splits) {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ String segId = CarbonTablePath.DataPathUtil
+ .getSegmentId(fileSplit.getPath().toString());//\u8fd9\u91cc\u7684seperator\u5e94\u8be5\u600e\u4e48\u52a0\uff1f\uff1f
+ if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
+ continue;
}
- return segmentIndexMap;
+ carbonSplits.add(fileSplit);
+ }
+
+ List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+ for (FileSplit inputSplit : carbonSplits) {
+ if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails,
+ updateStatusManager, segmentId)) {
+
+ BlockletInfos blockletInfos = new BlockletInfos(0, 0,
+ 0);//this level we do not need blocklet info!!!! Is this a trick?
+ tableBlockInfoList.add(
+ new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), segmentId,
+ inputSplit.getLocations(), inputSplit.getLength(), blockletInfos,
+ ColumnarFormatVersion
+ .valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION), null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//\u8fd9\u91cc\u7684null\u662f\u5426\u4f1a\u5f02\u5e38\uff1f
+ }
+ }
+
+ Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
+ segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+ // get Btree blocks for given segment
+ tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
+ tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
+ segmentTaskIndexWrapper =
+ cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
+ segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
}
+ return segmentIndexMap;
+ }
+
+ private boolean isValidBlockBasedOnUpdateDetails(
+ Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
+ UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
+ String taskID = null;
+ if (null != carbonInputSplit) {
+ if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
+ return false;
+ }
- private boolean isValidBlockBasedOnUpdateDetails(
- Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
- UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
- String taskID = null;
- if (null != carbonInputSplit) {
- if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
- return false;
- }
-
- if (null == taskKeys) {
- return true;
+ if (null == taskKeys) {
+ return true;
+ }
+
+ taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+ String bucketNo =
+ CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
+
+ SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
+ new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
+
+ String blockTimestamp = carbonInputSplit.getPath().getName()
+ .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
+ carbonInputSplit.getPath().getName().lastIndexOf('.'));
+ if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
+ && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
+ if (!taskKeys.contains(taskBucketHolder)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)
+ throws IOException {
+
+ Iterator split = fileStatusList.iterator();
+
+ List<InputSplit> splits = new ArrayList<>();
+
+ while (true) {
+ while (true) {
+ while (split.hasNext()) {
+ FileStatus file = (FileStatus) split.next();
+ Path path = file.getPath();
+ long length = file.getLen();
+ if (length != 0L) {
+ BlockLocation[] blkLocations;
+ if (file instanceof LocatedFileStatus) {
+ blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+ } else {
+ blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
}
- taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
- String bucketNo =
- CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
-
- SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
- new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
-
- String blockTimestamp = carbonInputSplit.getPath().getName()
- .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
- carbonInputSplit.getPath().getName().lastIndexOf('.'));
- if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
- && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
- if (!taskKeys.contains(taskBucketHolder)) {
- return true;
- }
+ if (this.isSplitable()) {
+ long blockSize1 = file.getBlockSize();
+ long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
+
+ long bytesRemaining;
+ int blkIndex;
+ for (
+ bytesRemaining = length;
+ (double) bytesRemaining / (double) splitSize > 1.1D;
+ bytesRemaining -= splitSize) {
+ blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(this.makeSplit(path, length - bytesRemaining, splitSize,
+ blkLocations[blkIndex].getHosts()));
+ }
+
+ if (bytesRemaining != 0L) {
+ blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining,
+ blkLocations[blkIndex].getHosts()));
+ }
+ } else {
+ splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+ blkLocations[0].getHosts()));
}
+ } else {
+ splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+ new String[0]));
+ }
}
- return false;
+ return splits;
+ }
}
- private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem) throws IOException {
-
- Iterator split = fileStatusList.iterator();
-
- List<InputSplit> splits = new ArrayList<>();
-
- while (true)
- {
- while (true)
- {
- while(split.hasNext()) {
- FileStatus file = (FileStatus) split.next();
- Path path = file.getPath();
- long length = file.getLen();
- if (length != 0L) {
- BlockLocation[] blkLocations;
- if (file instanceof LocatedFileStatus) {
- blkLocations = ((LocatedFileStatus) file).getBlockLocations();
- } else {
- blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
- }
-
- if (this.isSplitable()) {
- long blockSize1 = file.getBlockSize();
- long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
-
- long bytesRemaining;
- int blkIndex;
- for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) {
- blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
- }
-
- if (bytesRemaining != 0L) {
- blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));
- }
- }
- else
- {
- splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, blkLocations[0].getHosts()));
- }
- }
- else {
- splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, new String[0]));
- }
- }
- return splits;
- }
- }
+ }
- }
+ private String[] getValidPartitions() {
+ //TODO: has to Identify partitions by partition pruning
+ return new String[] { "0" };
+ }
- private String[] getValidPartitions() {
- //TODO: has to Identify partitions by partition pruning
- return new String[] { "0" };
+ private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath,
+ List<FileStatus> result) throws IOException {
+ String[] partitionsToConsider = getValidPartitions();
+ if (partitionsToConsider.length == 0) {
+ throw new IOException("No partitions/data found");
}
- private FileSystem getFileStatusOfSegments(String[] segmentsToConsider,
- CarbonTablePath tablePath,
- List<FileStatus> result) throws IOException {
- String[] partitionsToConsider = getValidPartitions();
- if (partitionsToConsider.length == 0) {
- throw new IOException("No partitions/data found");
- }
-
- FileSystem fs = null;
+ FileSystem fs = null;
- //PathFilter inputFilter = getDataFileFilter(job);
+ //PathFilter inputFilter = getDataFileFilter(job);
- // get tokens for all the required FileSystem for table path
+ // get tokens for all the required FileSystem for table path
/*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
job.getConfiguration());*/
- //get all data files of valid partitions and segments
- for (int i = 0; i < partitionsToConsider.length; ++i) {
- String partition = partitionsToConsider[i];
-
- for (int j = 0; j < segmentsToConsider.length; ++j) {
- String segmentId = segmentsToConsider[j];
- Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
-
- try{
- Configuration conf = new Configuration();
- fs = segmentPath.getFileSystem(conf);
- //fs.initialize(segmentPath.toUri(), conf);
-
- RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
- while (iter.hasNext()) {
- LocatedFileStatus stat = iter.next();
- //if(stat.getPath().toString().contains("carbondata"))//\u53c2\u770bcarbondata\u7684carbonInputFilter\u7684\u5b9e\u73b0
- if (DefaultFilter.accept(stat.getPath()))
- {
- if (stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
- } else {
- result.add(stat);
- }
- }
- }
- }catch (Exception ex){
- System.out.println(ex.toString());
- }
- }
- }
- return fs;
- }
-
- protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException {
- RemoteIterator iter = fs.listLocatedStatus(path);
-
- while(iter.hasNext()) {
- LocatedFileStatus stat = (LocatedFileStatus)iter.next();
- if(inputFilter.accept(stat.getPath())) {
- if(stat.isDirectory()) {
- this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
- } else {
- result.add(stat);
- }
- }
- }
-
- }
+ //get all data files of valid partitions and segments
+ for (int i = 0; i < partitionsToConsider.length; ++i) {
+ String partition = partitionsToConsider[i];
- /**
- * get data blocks of given btree
- */
- private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
- List<DataRefNode> blocks = new LinkedList<DataRefNode>();
- SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
+ for (int j = 0; j < segmentsToConsider.length; ++j) {
+ String segmentId = segmentsToConsider[j];
+ Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
try {
- IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
- IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
-
- // Add all blocks of btree into result
- DataRefNodeFinder blockFinder =
- new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
- DataRefNode startBlock =
- blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
- DataRefNode endBlock =
- blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
- while (startBlock != endBlock) {
- blocks.add(startBlock);
- startBlock = startBlock.getNextDataRefNode();
+ Configuration conf = new Configuration();
+ fs = segmentPath.getFileSystem(conf);
+ //fs.initialize(segmentPath.toUri(), conf);
+
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ //if(stat.getPath().toString().contains("carbondata"))//\u53c2\u770bcarbondata\u7684carbonInputFilter\u7684\u5b9e\u73b0
+ if (DefaultFilter.accept(stat.getPath())) {
+ if (stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
+ } else {
+ result.add(stat);
+ }
}
- blocks.add(endBlock);
-
- } catch (KeyGenException e) {
- System.out.println("Could not generate start key" + e.getMessage());
+ }
+ } catch (Exception ex) {
+ System.out.println(ex.toString());
}
- return blocks;
+ }
}
-
- private boolean isSplitable() {
- try {
- // Don't split the file if it is local file system
- if(this.fileType == FileFactory.FileType.LOCAL)
- {
- return false;
- }
- } catch (Exception e) {
- return true;
+ return fs;
+ }
+
+ protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path,
+ PathFilter inputFilter) throws IOException {
+ RemoteIterator iter = fs.listLocatedStatus(path);
+
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = (LocatedFileStatus) iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (stat.isDirectory()) {
+ this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
}
- return true;
+ }
}
- private long computeSplitSize(long blockSize, long minSize,
- long maxSize) {
- return Math.max(minSize, Math.min(maxSize, blockSize));
+ }
+
+ /**
+ * get data blocks of given btree
+ */
+ private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
+ List<DataRefNode> blocks = new LinkedList<DataRefNode>();
+ SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
+
+ try {
+ IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+ IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+
+ // Add all blocks of btree into result
+ DataRefNodeFinder blockFinder =
+ new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
+ DataRefNode startBlock =
+ blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
+ DataRefNode endBlock =
+ blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
+ while (startBlock != endBlock) {
+ blocks.add(startBlock);
+ startBlock = startBlock.getNextDataRefNode();
+ }
+ blocks.add(endBlock);
+
+ } catch (KeyGenException e) {
+ System.out.println("Could not generate start key" + e.getMessage());
}
+ return blocks;
+ }
- private FileSplit makeSplit(Path file, long start, long length,
- String[] hosts) {
- return new FileSplit(file, start, length, hosts);
+ private boolean isSplitable() {
+ try {
+ // Don't split the file if it is local file system
+ if (this.fileType == FileFactory.FileType.LOCAL) {
+ return false;
+ }
+ } catch (Exception e) {
+ return true;
}
-
- private int getBlockIndex(BlockLocation[] blkLocations,
- long offset) {
- for (int i = 0 ; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length -1];
- long fileLength = last.getOffset() + last.getLength() -1;
- throw new IllegalArgumentException("Offset " + offset +
- " is outside of file (0.." +
- fileLength + ")");
+ return true;
+ }
+
+ private long computeSplitSize(long blockSize, long minSize, long maxSize) {
+ return Math.max(minSize, Math.min(maxSize, blockSize));
+ }
+
+ private FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
+ return new FileSplit(file, start, length, hosts);
+ }
+
+ private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+ for (int i = 0; i < blkLocations.length; i++) {
+ // is the offset inside this block?
+ if ((blkLocations[i].getOffset() <= offset) && (offset
+ < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+ return i;
+ }
}
-
-
- /**
- * get total number of rows. for count(*)
- *
- * @throws IOException
- * @throws IndexBuilderException
- */
- public long getRowCount() throws IOException, IndexBuilderException {
- long rowCount = 0;
+ BlockLocation last = blkLocations[blkLocations.length - 1];
+ long fileLength = last.getOffset() + last.getLength() - 1;
+ throw new IllegalArgumentException("Offset " + offset +
+ " is outside of file (0.." +
+ fileLength + ")");
+ }
+
+ /**
+ * get total number of rows. for count(*)
+ *
+ * @throws IOException
+ * @throws IndexBuilderException
+ */
+ public long getRowCount() throws IOException, IndexBuilderException {
+ long rowCount = 0;
/*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
// no of core to load the blocks in driver
@@ -733,6 +729,6 @@ public class CarbonTableReader {
} catch (InterruptedException | ExecutionException e) {
throw new IndexBuilderException(e);
}*/
- return rowCount;
- }
+ return rowCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala
index cba2562..25d5cd5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala
@@ -32,7 +32,7 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
sql("drop table if exists a12")
- sql("drop table if exists a12_allnull")
+ sql("drop table if exists a12_all_null")
sql("drop table if exists a12_no_null")
sql("drop table if exists Test_Boundary1")
@@ -41,7 +41,7 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
"dob timestamp,comm decimal(4,2),desc string) stored by 'org.apache.carbondata.format'"
)
sql(
- "create table a12_allnull(empid String,ename String,sal double,deptno int,mgr string,gender" +
+ "create table a12_all_null(empid String,ename String,sal double,deptno int,mgr string,gender" +
" string," +
"dob timestamp,comm decimal(4,2),desc string) stored by 'org.apache.carbondata.format'"
)
@@ -62,7 +62,7 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
testData = s"$resourcesPath/filter/emp2allnull.csv"
sql(
- s"""LOAD DATA LOCAL INPATH '$testData' into table a12_allnull OPTIONS('DELIMITER'=',',
+ s"""LOAD DATA LOCAL INPATH '$testData' into table a12_all_null OPTIONS('DELIMITER'=',',
'QUOTECHAR'='"','FILEHEADER'='empid,ename,sal,deptno,mgr,gender,dob,comm,desc')"""
.stripMargin
)
@@ -115,34 +115,34 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
test("Less Than Filter all null") {
checkAnswer(
- sql("select count(empid) from a12_allnull where dob < '2014-07-01 12:07:28'"),
+ sql("select count(empid) from a12_all_null where dob < '2014-07-01 12:07:28'"),
Seq(Row(0))
)
}
test("Les Than equal Filter all null") {
checkAnswer(
- sql("select count (empid) from a12_allnull where dob <= '2014-07-01 12:07:28'"),
+ sql("select count (empid) from a12_all_null where dob <= '2014-07-01 12:07:28'"),
Seq(Row(0))
)
}
test("Greater Than Filter all null") {
checkAnswer(
- sql("select count (empid) from a12_allnull where dob > '2014-07-01 12:07:28'"),
+ sql("select count (empid) from a12_all_null where dob > '2014-07-01 12:07:28'"),
Seq(Row(0))
)
}
test("Greater Than equal to Filter all null") {
checkAnswer(
- sql("select count (empid) from a12_allnull where dob >= '2014-07-01 12:07:28'"),
+ sql("select count (empid) from a12_all_null where dob >= '2014-07-01 12:07:28'"),
Seq(Row(0))
)
}
// test("In condition With improper format query regarding Null filter") {
// checkAnswer(
-// sql("select empid from a12_allnull " + "where empid not in ('china',NULL)"),
+// sql("select empid from a12_all_null " + "where empid not in ('china',NULL)"),
// Seq()
// )
// }
@@ -179,7 +179,7 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
override def afterAll {
sql("drop table a12")
- sql("drop table if exists a12_allnull")
+ sql("drop table if exists a12_all_null")
sql("drop table if exists a12_no_null")
sql("drop table if exists Test_Boundary1")
CarbonProperties.getInstance()
[4/4] incubator-carbondata git commit: [CARBONDATA-826] Create
carbondata-connector of presto This closes #704
Posted by ch...@apache.org.
[CARBONDATA-826] Create carbondata-connector of presto This closes #704
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bd511409
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bd511409
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bd511409
Branch: refs/heads/master
Commit: bd511409796721f8ef8d5541604c49d6a277a6a0
Parents: 08a6e81 2712330
Author: chenliang613 <ch...@huawei.com>
Authored: Wed Mar 29 14:36:42 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Wed Mar 29 14:36:42 2017 +0530
----------------------------------------------------------------------
integration/presto/README.md | 86 ++
integration/presto/pom.xml | 22 +-
.../presto/CarbondataColumnConstraint.java | 101 +-
.../presto/CarbondataColumnHandle.java | 205 ++--
.../carbondata/presto/CarbondataConnector.java | 87 +-
.../presto/CarbondataConnectorFactory.java | 119 +-
.../presto/CarbondataConnectorId.java | 49 +-
.../presto/CarbondataHandleResolver.java | 36 +-
.../carbondata/presto/CarbondataMetadata.java | 427 ++++---
.../carbondata/presto/CarbondataModule.java | 63 +-
.../carbondata/presto/CarbondataPlugin.java | 14 +-
.../presto/CarbondataRecordCursor.java | 190 ++-
.../carbondata/presto/CarbondataRecordSet.java | 110 +-
.../presto/CarbondataRecordSetProvider.java | 431 ++++---
.../carbondata/presto/CarbondataSplit.java | 110 +-
.../presto/CarbondataSplitManager.java | 449 ++++----
.../presto/CarbondataTableHandle.java | 75 +-
.../presto/CarbondataTableLayoutHandle.java | 79 +-
.../presto/CarbondataTransactionHandle.java | 6 +-
.../org/apache/carbondata/presto/Types.java | 19 +-
.../presto/impl/CarbonLocalInputSplit.java | 103 +-
.../presto/impl/CarbonTableCacheModel.java | 24 +-
.../presto/impl/CarbonTableConfig.java | 68 +-
.../presto/impl/CarbonTableReader.java | 1088 +++++++++---------
.../GrtLtFilterProcessorTestCase.scala | 18 +-
25 files changed, 1906 insertions(+), 2073 deletions(-)
----------------------------------------------------------------------
[2/4] incubator-carbondata git commit: [CARBONDATA-826] Create
carbondata-connector for query carbon data in presto
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 95895fc..85c53ad 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -56,241 +56,218 @@ import static java.util.Objects.requireNonNull;
public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
- private final String connectorId;
- private final CarbonTableReader carbonTableReader;
-
- @Inject
- public CarbondataRecordSetProvider(
- CarbondataConnectorId connectorId,
- CarbonTableReader reader)
- {
- //this.config = requireNonNull(config, "config is null");
- //this.connector = requireNonNull(connector, "connector is null");
- this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
- this.carbonTableReader = reader;
+ private final String connectorId;
+ private final CarbonTableReader carbonTableReader;
+
+ @Inject
+ public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+ //this.config = requireNonNull(config, "config is null");
+ //this.connector = requireNonNull(connector, "connector is null");
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = reader;
+ }
+
+ @Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
+ requireNonNull(split, "split is null");
+ requireNonNull(columns, "columns is null");
+
+ // Convert split
+ CarbondataSplit cdSplit =
+ checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
+ checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+
+ // Convert all columns handles
+ ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+ for (ColumnHandle handle : columns) {
+ handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
}
- @Override
- public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
- requireNonNull(split, "split is null");
- requireNonNull(columns, "columns is null");
-
- // Convert split
- CarbondataSplit cdSplit = checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
- checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
-
- // Convert all columns handles
- ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
- for (ColumnHandle handle : columns) {
- handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
- }
-
- // Build column projection(check the column order)
- String targetCols = "";
- for(ColumnHandle col : columns){
- targetCols += ((CarbondataColumnHandle)col).getColumnName() + ",";
- }
- targetCols = targetCols.substring(0, targetCols.length() -1 );
- //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
-
- CarbonTableCacheModel tableCacheModel = carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
- checkNotNull(tableCacheModel, "tableCacheModel should not be null");
- checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
- checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
-
- // Build Query Model
- CarbonTable targetTable = tableCacheModel.carbonTable;
- CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
- QueryModel queryModel = QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
-
- // Push down filter
- fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
-
- // Return new record set
- return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit, handles.build(), queryModel);
+ // Build column projection(check the column order)
+ String targetCols = "";
+ for (ColumnHandle col : columns) {
+ targetCols += ((CarbondataColumnHandle) col).getColumnName() + ",";
}
-
- // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
- private void fillFilter2QueryModel(QueryModel queryModel, TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
-
- //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
-
- //Build Predicate Expression
- ImmutableList.Builder<Expression> filters = ImmutableList.builder();
-
- Domain domain = null;
-
- for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
-
- // Build ColumnExpresstion for Expresstion(Carbondata)
- CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
- Type type = cdch.getColumnType();
-
- DataType coltype = Spi2CarbondataTypeMapper(type);
- Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
-
- domain = originalConstraint.getDomains().get().get(c);
- checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
-
- if (domain.getValues().isNone()) {
- //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
- //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
- //new Expression()
- }
-
- if (domain.getValues().isAll()) {
- //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
- //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
- }
-
- List<Object> singleValues = new ArrayList<>();
- List<Expression> rangeFilter = new ArrayList<>();
- for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
- checkState(!range.isAll()); // Already checked
- if (range.isSingleValue()) {
- singleValues.add(range.getLow().getValue());
- }
- else
- {
- List<String> rangeConjuncts = new ArrayList<>();
- if (!range.getLow().isLowerUnbounded()) {
- Object value = ConvertDataByType(range.getLow().getValue(), type);
- switch (range.getLow().getBound()) {
- case ABOVE:
- if (type == TimestampType.TIMESTAMP) {
- //todo not now
- } else {
- GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
- //greater.setRangeExpression(true);
- rangeFilter.add(greater);
- }
- break;
- case EXACTLY:
- GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
- //greater.setRangeExpression(true);
- rangeFilter.add(greater);
- break;
- case BELOW:
- throw new IllegalArgumentException("Low marker should never use BELOW bound");
- default:
- throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
- }
- }
- if (!range.getHigh().isUpperUnbounded()) {
- Object value = ConvertDataByType(range.getHigh().getValue(), type);
- switch (range.getHigh().getBound()) {
- case ABOVE:
- throw new IllegalArgumentException("High marker should never use ABOVE bound");
- case EXACTLY:
- LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
- //less.setRangeExpression(true);
- rangeFilter.add(less);
- break;
- case BELOW:
- LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
- //less2.setRangeExpression(true);
- rangeFilter.add(less2);
- break;
- default:
- throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
- }
- }
+ targetCols = targetCols.substring(0, targetCols.length() - 1);
+ //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
+
+ CarbonTableCacheModel tableCacheModel =
+ carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+ checkNotNull(tableCacheModel, "tableCacheModel should not be null");
+ checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
+ checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
+
+ // Build Query Model
+ CarbonTable targetTable = tableCacheModel.carbonTable;
+ CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
+ QueryModel queryModel =
+ QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
+
+ // Push down filter
+ fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+
+ // Return new record set
+ return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit,
+ handles.build(), queryModel);
+ }
+
+ // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+ private void fillFilter2QueryModel(QueryModel queryModel,
+ TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
+
+ //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
+
+ //Build Predicate Expression
+ ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+ Domain domain = null;
+
+ for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+ // Build ColumnExpresstion for Expresstion(Carbondata)
+ CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+ Type type = cdch.getColumnType();
+
+ DataType coltype = Spi2CarbondataTypeMapper(type);
+ Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+
+ domain = originalConstraint.getDomains().get().get(c);
+ checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+ if (domain.getValues().isNone()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+ //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+ //new Expression()
+ }
+
+ if (domain.getValues().isAll()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+ //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+ }
+
+ List<Object> singleValues = new ArrayList<>();
+ List<Expression> rangeFilter = new ArrayList<>();
+ for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+ checkState(!range.isAll()); // Already checked
+ if (range.isSingleValue()) {
+ singleValues.add(range.getLow().getValue());
+ } else {
+ List<String> rangeConjuncts = new ArrayList<>();
+ if (!range.getLow().isLowerUnbounded()) {
+ Object value = ConvertDataByType(range.getLow().getValue(), type);
+ switch (range.getLow().getBound()) {
+ case ABOVE:
+ if (type == TimestampType.TIMESTAMP) {
+ //todo not now
+ } else {
+ GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
}
+ break;
+ case EXACTLY:
+ GreaterThanEqualToExpression greater =
+ new GreaterThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ break;
+ case BELOW:
+ throw new IllegalArgumentException("Low marker should never use BELOW bound");
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
}
-
- if (singleValues.size() == 1) {
- Expression ex = null;
- if (coltype.equals(DataType.STRING)) {
- ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
- } else
- ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
- filters.add(ex);
- }
- else if(singleValues.size() > 1) {
- ListExpression candidates = null;
- List<Expression> exs = singleValues.stream().map((a) ->
- {
- return new LiteralExpression(ConvertDataByType(a, type), coltype);
- }).collect(Collectors.toList());
- candidates = new ListExpression(exs);
-
- if(candidates != null)
- filters.add(new InExpression(colExpression, candidates));
- }
- else if(rangeFilter.size() > 0){
- if(rangeFilter.size() > 1) {
- Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
- if(rangeFilter.size() > 2)
- {
- for(int i = 2; i< rangeFilter.size(); i++)
- {
- filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
- }
- }
- }
- else if(rangeFilter.size() == 1)
- filters.add(rangeFilter.get(0));
+ }
+ if (!range.getHigh().isUpperUnbounded()) {
+ Object value = ConvertDataByType(range.getHigh().getValue(), type);
+ switch (range.getHigh().getBound()) {
+ case ABOVE:
+ throw new IllegalArgumentException("High marker should never use ABOVE bound");
+ case EXACTLY:
+ LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //less.setRangeExpression(true);
+ rangeFilter.add(less);
+ break;
+ case BELOW:
+ LessThanExpression less2 =
+ new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+ //less2.setRangeExpression(true);
+ rangeFilter.add(less2);
+ break;
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
}
+ }
}
-
- Expression finalFilters;
- List<Expression> tmp = filters.build();
- if(tmp.size() > 1) {
- finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
- if(tmp.size() > 2)
- {
- for(int i = 2; i< tmp.size(); i++)
- {
- finalFilters = new AndExpression(finalFilters, tmp.get(i));
- }
+ }
+
+ if (singleValues.size() == 1) {
+ Expression ex = null;
+ if (coltype.equals(DataType.STRING)) {
+ ex = new EqualToExpression(colExpression,
+ new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+ } else ex = new EqualToExpression(colExpression,
+ new LiteralExpression(singleValues.get(0), coltype));
+ filters.add(ex);
+ } else if (singleValues.size() > 1) {
+ ListExpression candidates = null;
+ List<Expression> exs = singleValues.stream().map((a) -> {
+ return new LiteralExpression(ConvertDataByType(a, type), coltype);
+ }).collect(Collectors.toList());
+ candidates = new ListExpression(exs);
+
+ if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+ } else if (rangeFilter.size() > 0) {
+ if (rangeFilter.size() > 1) {
+ Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+ if (rangeFilter.size() > 2) {
+ for (int i = 2; i < rangeFilter.size(); i++) {
+ filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
}
- }
- else if(tmp.size() == 1)
- finalFilters = tmp.get(0);
- else
- return;
-
- // todo set into QueryModel
- CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
- queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
- }
-
- public static DataType Spi2CarbondataTypeMapper(Type colType)
- {
- if(colType == BooleanType.BOOLEAN)
- return DataType.BOOLEAN;
- else if(colType == SmallintType.SMALLINT)
- return DataType.SHORT;
- else if(colType == IntegerType.INTEGER)
- return DataType.INT;
- else if(colType == BigintType.BIGINT)
- return DataType.LONG;
- else if(colType == DoubleType.DOUBLE)
- return DataType.DOUBLE;
- else if(colType == DecimalType.createDecimalType())
- return DataType.DECIMAL;
- else if(colType == VarcharType.VARCHAR)
- return DataType.STRING;
- else if(colType == DateType.DATE)
- return DataType.DATE;
- else if(colType == TimestampType.TIMESTAMP)
- return DataType.TIMESTAMP;
- else
- return DataType.STRING;
+ }
+ } else if (rangeFilter.size() == 1) filters.add(rangeFilter.get(0));
+ }
}
-
- public Object ConvertDataByType(Object rawdata, Type type)
- {
- if(type.equals(IntegerType.INTEGER))
- return new Integer((rawdata.toString()));
- else if(type.equals(BigintType.BIGINT))
- return (Long)rawdata;
- else if(type.equals(VarcharType.VARCHAR))
- return ((Slice)rawdata).toStringUtf8();
- else if(type.equals(BooleanType.BOOLEAN))
- return (Boolean)(rawdata);
-
- return rawdata;
- }
+ Expression finalFilters;
+ List<Expression> tmp = filters.build();
+ if (tmp.size() > 1) {
+ finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+ if (tmp.size() > 2) {
+ for (int i = 2; i < tmp.size(); i++) {
+ finalFilters = new AndExpression(finalFilters, tmp.get(i));
+ }
+ }
+ } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+ else return;
+
+ // todo set into QueryModel
+ CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
+ queryModel.setFilterExpressionResolverTree(
+ CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
+ }
+
+ public static DataType Spi2CarbondataTypeMapper(Type colType) {
+ if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+ else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+ else if (colType == IntegerType.INTEGER) return DataType.INT;
+ else if (colType == BigintType.BIGINT) return DataType.LONG;
+ else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+ else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+ else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+ else if (colType == DateType.DATE) return DataType.DATE;
+ else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+ else return DataType.STRING;
+ }
+
+ public Object ConvertDataByType(Object rawdata, Type type) {
+ if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+ else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+ else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+ else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+ return rawdata;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
index 741dfcc..ecc41ef 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
@@ -33,64 +33,56 @@ import static java.util.Objects.requireNonNull;
public class CarbondataSplit implements ConnectorSplit {
- private final String connectorId;
- private final SchemaTableName schemaTableName;
- private final TupleDomain<ColumnHandle> constraints;
- private final CarbonLocalInputSplit localInputSplit;
- private final List<CarbondataColumnConstraint> rebuildConstraints;
- private final ImmutableList<HostAddress> addresses;
-
- @JsonCreator
- public CarbondataSplit( @JsonProperty("connectorId") String connectorId,
- @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
- @JsonProperty("constraints") TupleDomain<ColumnHandle> constraints,
- @JsonProperty("localInputSplit") CarbonLocalInputSplit localInputSplit,
- @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints) {
- this.connectorId = requireNonNull(connectorId, "connectorId is null");
- this.schemaTableName = requireNonNull(schemaTableName, "schemaTable is null");
- this.constraints = requireNonNull(constraints, "constraints is null");
- this.localInputSplit = requireNonNull(localInputSplit, "localInputSplit is null");
- this.rebuildConstraints = requireNonNull(rebuildConstraints, "rebuildConstraints is null");
- this.addresses = ImmutableList.of();
- }
-
-
- @JsonProperty
- public String getConnectorId() {
- return connectorId;
- }
-
- @JsonProperty
- public SchemaTableName getSchemaTableName(){
- return schemaTableName;
- }
-
- @JsonProperty
- public TupleDomain<ColumnHandle> getConstraints() {
- return constraints;
- }
-
- @JsonProperty
- public CarbonLocalInputSplit getLocalInputSplit(){return localInputSplit;}
-
- @JsonProperty
- public List<CarbondataColumnConstraint> getRebuildConstraints() {
- return rebuildConstraints;
- }
-
- @Override
- public boolean isRemotelyAccessible() {
- return true;
- }
-
- @Override
- public List<HostAddress> getAddresses() {
- return addresses;
- }
-
- @Override
- public Object getInfo() {
- return this;
- }
+ private final String connectorId;
+ private final SchemaTableName schemaTableName;
+ private final TupleDomain<ColumnHandle> constraints;
+ private final CarbonLocalInputSplit localInputSplit;
+ private final List<CarbondataColumnConstraint> rebuildConstraints;
+ private final ImmutableList<HostAddress> addresses;
+
+ @JsonCreator public CarbondataSplit(@JsonProperty("connectorId") String connectorId,
+ @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
+ @JsonProperty("constraints") TupleDomain<ColumnHandle> constraints,
+ @JsonProperty("localInputSplit") CarbonLocalInputSplit localInputSplit,
+ @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints) {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.schemaTableName = requireNonNull(schemaTableName, "schemaTable is null");
+ this.constraints = requireNonNull(constraints, "constraints is null");
+ this.localInputSplit = requireNonNull(localInputSplit, "localInputSplit is null");
+ this.rebuildConstraints = requireNonNull(rebuildConstraints, "rebuildConstraints is null");
+ this.addresses = ImmutableList.of();
+ }
+
+ @JsonProperty public String getConnectorId() {
+ return connectorId;
+ }
+
+ @JsonProperty public SchemaTableName getSchemaTableName() {
+ return schemaTableName;
+ }
+
+ @JsonProperty public TupleDomain<ColumnHandle> getConstraints() {
+ return constraints;
+ }
+
+ @JsonProperty public CarbonLocalInputSplit getLocalInputSplit() {
+ return localInputSplit;
+ }
+
+ @JsonProperty public List<CarbondataColumnConstraint> getRebuildConstraints() {
+ return rebuildConstraints;
+ }
+
+ @Override public boolean isRemotelyAccessible() {
+ return true;
+ }
+
+ @Override public List<HostAddress> getAddresses() {
+ return addresses;
+ }
+
+ @Override public Object getInfo() {
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index 86390e3..a8902eb 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -50,255 +50,224 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
-public class CarbondataSplitManager
- implements ConnectorSplitManager
-{
-
- private final String connectorId;
- private final CarbonTableReader carbonTableReader;
-
- @Inject
- public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader)
- {
- this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
- this.carbonTableReader = requireNonNull(reader, "client is null");
- }
-
- public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
- {
- CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle)layout;
- CarbondataTableHandle tableHandle = layoutHandle.getTable();
- SchemaTableName key = tableHandle.getSchemaTableName();
-
- //get all filter domain
- List<CarbondataColumnConstraint> rebuildConstraints = getColumnConstraints(layoutHandle.getConstraint());
-
- CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
- Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
-
- if(cache != null) {
- try {
- List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
-
- ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
- for (CarbonLocalInputSplit split : splits) {
- cSplits.add(new CarbondataSplit(
- connectorId,
- tableHandle.getSchemaTableName(),
- layoutHandle.getConstraint(),
- split,
- rebuildConstraints
- ));
- }
- return new FixedSplitSource(cSplits.build());
- } catch (Exception ex) {
- System.out.println(ex.toString());
- }
+public class CarbondataSplitManager implements ConnectorSplitManager {
+
+ private final String connectorId;
+ private final CarbonTableReader carbonTableReader;
+
+ @Inject
+ public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = requireNonNull(reader, "client is null");
+ }
+
+ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session, ConnectorTableLayoutHandle layout) {
+ CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle) layout;
+ CarbondataTableHandle tableHandle = layoutHandle.getTable();
+ SchemaTableName key = tableHandle.getSchemaTableName();
+
+ //get all filter domain
+ List<CarbondataColumnConstraint> rebuildConstraints =
+ getColumnConstraints(layoutHandle.getConstraint());
+
+ CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
+ Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
+
+ if (cache != null) {
+ try {
+ List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
+
+ ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
+ for (CarbonLocalInputSplit split : splits) {
+ cSplits.add(new CarbondataSplit(connectorId, tableHandle.getSchemaTableName(),
+ layoutHandle.getConstraint(), split, rebuildConstraints));
}
- return null;
+ return new FixedSplitSource(cSplits.build());
+ } catch (Exception ex) {
+ System.out.println(ex.toString());
+ }
}
-
-
- public List<CarbondataColumnConstraint> getColumnConstraints(TupleDomain<ColumnHandle> constraint)
- {
- ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
- for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains().get()) {
- CarbondataColumnHandle columnHandle = checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
-
- constraintBuilder.add(new CarbondataColumnConstraint(
- columnHandle.getColumnName(),
- Optional.of(columnDomain.getDomain()),
- columnHandle.isInvertedIndex()));
- }
-
- return constraintBuilder.build();
+ return null;
+ }
+
+ public List<CarbondataColumnConstraint> getColumnConstraints(
+ TupleDomain<ColumnHandle> constraint) {
+ ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
+ for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains()
+ .get()) {
+ CarbondataColumnHandle columnHandle =
+ checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
+
+ constraintBuilder.add(new CarbondataColumnConstraint(columnHandle.getColumnName(),
+ Optional.of(columnDomain.getDomain()), columnHandle.isInvertedIndex()));
}
-
- public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable)
- {
- ImmutableList.Builder<Expression> filters = ImmutableList.builder();
-
- Domain domain = null;
-
- for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
-
- CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
- Type type = cdch.getColumnType();
-
- List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
- Optional<CarbonColumn> target = ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
-
- if(target.get() == null)
- return null;
-
- DataType coltype = target.get().getDataType();
- ColumnExpression colExpression = new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
- //colExpression.setColIndex(cs.getSchemaOrdinal());
- colExpression.setDimension(target.get().isDimesion());
- colExpression.setDimension(carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
- colExpression.setCarbonColumn(target.get());
-
- domain = originalConstraint.getDomains().get().get(c);
- checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
-
- if (domain.getValues().isNone()) {
- //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
- //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
- //new Expression()
- }
-
- if (domain.getValues().isAll()) {
- //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
- //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
- }
-
- List<Object> singleValues = new ArrayList<>();
- List<Expression> rangeFilter = new ArrayList<>();
- for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
- checkState(!range.isAll()); // Already checked
- if (range.isSingleValue()) {
- singleValues.add(range.getLow().getValue());
- }
- else
- {
- List<String> rangeConjuncts = new ArrayList<>();
- if (!range.getLow().isLowerUnbounded()) {
- Object value = ConvertDataByType(range.getLow().getValue(), type);
- switch (range.getLow().getBound()) {
- case ABOVE:
- if (type == TimestampType.TIMESTAMP) {
- //todo not now
- } else {
- GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
- //greater.setRangeExpression(true);
- rangeFilter.add(greater);
- }
- break;
- case EXACTLY:
- GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
- //greater.setRangeExpression(true);
- rangeFilter.add(greater);
- break;
- case BELOW:
- throw new IllegalArgumentException("Low marker should never use BELOW bound");
- default:
- throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
- }
- }
- if (!range.getHigh().isUpperUnbounded()) {
- Object value = ConvertDataByType(range.getHigh().getValue(), type);
- switch (range.getHigh().getBound()) {
- case ABOVE:
- throw new IllegalArgumentException("High marker should never use ABOVE bound");
- case EXACTLY:
- LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
- //less.setRangeExpression(true);
- rangeFilter.add(less);
- break;
- case BELOW:
- LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
- //less2.setRangeExpression(true);
- rangeFilter.add(less2);
- break;
- default:
- throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
- }
- }
+ return constraintBuilder.build();
+ }
+
+ public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint,
+ CarbonTable carbonTable) {
+ ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+ Domain domain = null;
+
+ for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+ CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+ Type type = cdch.getColumnType();
+
+ List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
+ Optional<CarbonColumn> target =
+ ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
+
+ if (target.get() == null) return null;
+
+ DataType coltype = target.get().getDataType();
+ ColumnExpression colExpression =
+ new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
+ //colExpression.setColIndex(cs.getSchemaOrdinal());
+ colExpression.setDimension(target.get().isDimesion());
+ colExpression.setDimension(
+ carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
+ colExpression.setCarbonColumn(target.get());
+
+ domain = originalConstraint.getDomains().get().get(c);
+ checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+ if (domain.getValues().isNone()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+ //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+ //new Expression()
+ }
+
+ if (domain.getValues().isAll()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+ //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+ }
+
+ List<Object> singleValues = new ArrayList<>();
+ List<Expression> rangeFilter = new ArrayList<>();
+ for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+ checkState(!range.isAll()); // Already checked
+ if (range.isSingleValue()) {
+ singleValues.add(range.getLow().getValue());
+ } else {
+ List<String> rangeConjuncts = new ArrayList<>();
+ if (!range.getLow().isLowerUnbounded()) {
+ Object value = ConvertDataByType(range.getLow().getValue(), type);
+ switch (range.getLow().getBound()) {
+ case ABOVE:
+ if (type == TimestampType.TIMESTAMP) {
+ //todo not now
+ } else {
+ GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
}
+ break;
+ case EXACTLY:
+ GreaterThanEqualToExpression greater =
+ new GreaterThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ break;
+ case BELOW:
+ throw new IllegalArgumentException("Low marker should never use BELOW bound");
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
}
-
- if (singleValues.size() == 1) {
- Expression ex = null;
- if (coltype.equals(DataType.STRING)) {
- ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
- } else
- ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
- filters.add(ex);
- }
- else if(singleValues.size() > 1) {
- ListExpression candidates = null;
- List<Expression> exs = singleValues.stream().map((a) ->
- {
- return new LiteralExpression(ConvertDataByType(a, type), coltype);
- }).collect(Collectors.toList());
- candidates = new ListExpression(exs);
-
- if(candidates != null)
- filters.add(new InExpression(colExpression, candidates));
- }
- else if(rangeFilter.size() > 0){
- if(rangeFilter.size() > 1) {
- Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
- if(rangeFilter.size() > 2)
- {
- for(int i = 2; i< rangeFilter.size(); i++)
- {
- filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
- }
- }
- }
- else if(rangeFilter.size() == 1)//only have one value
- filters.add(rangeFilter.get(0));
+ }
+ if (!range.getHigh().isUpperUnbounded()) {
+ Object value = ConvertDataByType(range.getHigh().getValue(), type);
+ switch (range.getHigh().getBound()) {
+ case ABOVE:
+ throw new IllegalArgumentException("High marker should never use ABOVE bound");
+ case EXACTLY:
+ LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //less.setRangeExpression(true);
+ rangeFilter.add(less);
+ break;
+ case BELOW:
+ LessThanExpression less2 =
+ new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+ //less2.setRangeExpression(true);
+ rangeFilter.add(less2);
+ break;
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
}
+ }
}
-
- Expression finalFilters;
- List<Expression> tmp = filters.build();
- if(tmp.size() > 1) {
- finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
- if(tmp.size() > 2)
- {
- for(int i = 2; i< tmp.size(); i++)
- {
- finalFilters = new AndExpression(finalFilters, tmp.get(i));
- }
+ }
+
+ if (singleValues.size() == 1) {
+ Expression ex = null;
+ if (coltype.equals(DataType.STRING)) {
+ ex = new EqualToExpression(colExpression,
+ new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+ } else ex = new EqualToExpression(colExpression,
+ new LiteralExpression(singleValues.get(0), coltype));
+ filters.add(ex);
+ } else if (singleValues.size() > 1) {
+ ListExpression candidates = null;
+ List<Expression> exs = singleValues.stream().map((a) -> {
+ return new LiteralExpression(ConvertDataByType(a, type), coltype);
+ }).collect(Collectors.toList());
+ candidates = new ListExpression(exs);
+
+ if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+ } else if (rangeFilter.size() > 0) {
+ if (rangeFilter.size() > 1) {
+ Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+ if (rangeFilter.size() > 2) {
+ for (int i = 2; i < rangeFilter.size(); i++) {
+ filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
}
- }
- else if(tmp.size() == 1)
- finalFilters = tmp.get(0);
- else//no filter
- return null;
-
- return finalFilters;
- }
-
- public static DataType Spi2CarbondataTypeMapper(Type colType)
- {
- if(colType == BooleanType.BOOLEAN)
- return DataType.BOOLEAN;
- else if(colType == SmallintType.SMALLINT)
- return DataType.SHORT;
- else if(colType == IntegerType.INTEGER)
- return DataType.INT;
- else if(colType == BigintType.BIGINT)
- return DataType.LONG;
- else if(colType == DoubleType.DOUBLE)
- return DataType.DOUBLE;
- else if(colType == DecimalType.createDecimalType())
- return DataType.DECIMAL;
- else if(colType == VarcharType.VARCHAR)
- return DataType.STRING;
- else if(colType == DateType.DATE)
- return DataType.DATE;
- else if(colType == TimestampType.TIMESTAMP)
- return DataType.TIMESTAMP;
- else
- return DataType.STRING;
+ }
+ } else if (rangeFilter.size() == 1)//only have one value
+ filters.add(rangeFilter.get(0));
+ }
}
-
- public Object ConvertDataByType(Object rawdata, Type type)
- {
- if(type.equals(IntegerType.INTEGER))
- return new Integer((rawdata.toString()));
- else if(type.equals(BigintType.BIGINT))
- return (Long)rawdata;
- else if(type.equals(VarcharType.VARCHAR))
- return ((Slice)rawdata).toStringUtf8();
- else if(type.equals(BooleanType.BOOLEAN))
- return (Boolean)(rawdata);
-
- return rawdata;
- }
+ Expression finalFilters;
+ List<Expression> tmp = filters.build();
+ if (tmp.size() > 1) {
+ finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+ if (tmp.size() > 2) {
+ for (int i = 2; i < tmp.size(); i++) {
+ finalFilters = new AndExpression(finalFilters, tmp.get(i));
+ }
+ }
+ } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+ else//no filter
+ return null;
+
+ return finalFilters;
+ }
+
+ public static DataType Spi2CarbondataTypeMapper(Type colType) {
+ if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+ else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+ else if (colType == IntegerType.INTEGER) return DataType.INT;
+ else if (colType == BigintType.BIGINT) return DataType.LONG;
+ else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+ else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+ else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+ else if (colType == DateType.DATE) return DataType.DATE;
+ else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+ else return DataType.STRING;
+ }
+
+ public Object ConvertDataByType(Object rawdata, Type type) {
+ if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+ else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+ else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+ else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+ return rawdata;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
index b0caf52..0a3c820 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
@@ -28,57 +28,44 @@ import java.util.Objects;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
-public class CarbondataTableHandle
- implements ConnectorTableHandle {
+public class CarbondataTableHandle implements ConnectorTableHandle {
- private final String connectorId;
- private final SchemaTableName schemaTableName;
+ private final String connectorId;
+ private final SchemaTableName schemaTableName;
- @JsonCreator
- public CarbondataTableHandle(
- @JsonProperty("connectorId") String connectorId,
- @JsonProperty("schemaTableName") SchemaTableName schemaTableName)
- {
- this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
- this.schemaTableName = schemaTableName;
- }
-
- @JsonProperty
- public String getConnectorId()
- {
- return connectorId;
- }
+ @JsonCreator public CarbondataTableHandle(@JsonProperty("connectorId") String connectorId,
+ @JsonProperty("schemaTableName") SchemaTableName schemaTableName) {
+ this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
+ this.schemaTableName = schemaTableName;
+ }
- @JsonProperty
- public SchemaTableName getSchemaTableName()
- {
- return schemaTableName;
- }
+ @JsonProperty public String getConnectorId() {
+ return connectorId;
+ }
- @Override
- public int hashCode()
- {
- return Objects.hash(connectorId, schemaTableName);
- }
+ @JsonProperty public SchemaTableName getSchemaTableName() {
+ return schemaTableName;
+ }
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
- if ((obj == null) || (getClass() != obj.getClass())) {
- return false;
- }
+ @Override public int hashCode() {
+ return Objects.hash(connectorId, schemaTableName);
+ }
- CarbondataTableHandle other = (CarbondataTableHandle) obj;
- return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName.equals(other.getSchemaTableName());
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
}
-
- @Override
- public String toString()
- {
- return Joiner.on(":").join(connectorId, schemaTableName.toString());
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
}
+ CarbondataTableHandle other = (CarbondataTableHandle) obj;
+ return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName
+ .equals(other.getSchemaTableName());
+ }
+
+ @Override public String toString() {
+ return Joiner.on(":").join(connectorId, schemaTableName.toString());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
index dc0506f..bf6318f 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
@@ -29,60 +29,43 @@ import java.util.Objects;
import static com.google.common.base.Objects.toStringHelper;
import static java.util.Objects.requireNonNull;
-public class CarbondataTableLayoutHandle
- implements ConnectorTableLayoutHandle
-{
- private final CarbondataTableHandle table;
- private final TupleDomain<ColumnHandle> constraint;
+public class CarbondataTableLayoutHandle implements ConnectorTableLayoutHandle {
+ private final CarbondataTableHandle table;
+ private final TupleDomain<ColumnHandle> constraint;
- @JsonCreator
- public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
- @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
- {
- this.table = requireNonNull(table, "table is null");
- this.constraint = requireNonNull(constraint, "constraint is null");
- }
-
- @JsonProperty
- public CarbondataTableHandle getTable()
- {
- return table;
- }
-
- @JsonProperty
- public TupleDomain<ColumnHandle> getConstraint()
- {
- return constraint;
- }
+ @JsonCreator
+ public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
+ @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint) {
+ this.table = requireNonNull(table, "table is null");
+ this.constraint = requireNonNull(constraint, "constraint is null");
+ }
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
+ @JsonProperty public CarbondataTableHandle getTable() {
+ return table;
+ }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
+ @JsonProperty public TupleDomain<ColumnHandle> getConstraint() {
+ return constraint;
+ }
- CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
- return Objects.equals(table, other.table)
- && Objects.equals(constraint, other.constraint);
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
}
- @Override
- public int hashCode()
- {
- return Objects.hash(table, constraint);
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
}
- @Override
- public String toString()
- {
- return toStringHelper(this)
- .add("table", table)
- .add("constraint", constraint)
- .toString();
- }
+ CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
+ return Objects.equals(table, other.table) && Objects.equals(constraint, other.constraint);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(table, constraint);
+ }
+
+ @Override public String toString() {
+ return toStringHelper(this).add("table", table).add("constraint", constraint).toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
index 06a84e2..e95c490 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.presto;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-public enum CarbondataTransactionHandle
- implements ConnectorTransactionHandle
-{
- INSTANCE
+public enum CarbondataTransactionHandle implements ConnectorTransactionHandle {
+ INSTANCE
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java b/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java
index b7b0d90..cb30907 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java
@@ -23,16 +23,13 @@ import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
public class Types {
- private Types() {}
+ private Types() {
+ }
- public static <A, B extends A> B checkType(A value, Class<B> target, String name)
- {
- requireNonNull(value, String.format(Locale.ENGLISH, "%s is null", name));
- checkArgument(target.isInstance(value),
- "%s must be of type %s, not %s",
- name,
- target.getName(),
- value.getClass().getName());
- return target.cast(value);
- }
+ public static <A, B extends A> B checkType(A value, Class<B> target, String name) {
+ requireNonNull(value, String.format(Locale.ENGLISH, "%s is null", name));
+ checkArgument(target.isInstance(value), "%s must be of type %s, not %s", name, target.getName(),
+ value.getClass().getName());
+ return target.cast(value);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index 9cde7a6..9940061 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -24,70 +24,59 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class CarbonLocalInputSplit {
- private static final long serialVersionUID = 3520344046772190207L;
- private String segmentId;
- private String path;
- private long start;
- private long length;
- private List<String> locations;
- private short version;
- /**
- * Number of BlockLets in a block
- */
- private int numberOfBlocklets = 0;
+ private static final long serialVersionUID = 3520344046772190207L;
+ private String segmentId;
+ private String path;
+ private long start;
+ private long length;
+ private List<String> locations;
+ private short version;
+ /**
+ * Number of BlockLets in a block
+ */
+ private int numberOfBlocklets = 0;
+ @JsonProperty public short getVersion() {
+ return version;
+ }
- @JsonProperty
- public short getVersion(){
- return version;
- }
+ @JsonProperty public List<String> getLocations() {
+ return locations;
+ }
- @JsonProperty
- public List<String> getLocations() {
- return locations;
- }
+ @JsonProperty public long getLength() {
+ return length;
+ }
- @JsonProperty
- public long getLength() {
- return length;
- }
+ @JsonProperty public long getStart() {
+ return start;
+ }
- @JsonProperty
- public long getStart() {
- return start;
- }
+ @JsonProperty public String getPath() {
+ return path;
+ }
- @JsonProperty
- public String getPath() {
- return path;
- }
+ @JsonProperty public String getSegmentId() {
+ return segmentId;
+ }
- @JsonProperty
- public String getSegmentId() {
- return segmentId;
- }
+ @JsonProperty public int getNumberOfBlocklets() {
+ return numberOfBlocklets;
+ }
- @JsonProperty
- public int getNumberOfBlocklets() {
- return numberOfBlocklets;
- }
-
- @JsonCreator
- public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
- @JsonProperty("path") String path,
- @JsonProperty("start") long start,
- @JsonProperty("length") long length,
- @JsonProperty("locations") List<String> locations,
- @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
+ @JsonCreator public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
+ @JsonProperty("path") String path, @JsonProperty("start") long start,
+ @JsonProperty("length") long length, @JsonProperty("locations") List<String> locations,
+ @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
@JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
- @JsonProperty("version") short version) {
- this.path = path;
- this.start = start;
- this.length = length;
- this.segmentId = segmentId;
- this.locations = locations;
- this.numberOfBlocklets = numberOfBlocklets;
- //this.tableBlockInfo = tableBlockInfo;
- this.version = version;
- }
+ @JsonProperty("version") short version) {
+ this.path = path;
+ this.start = start;
+ this.length = length;
+ this.segmentId = segmentId;
+ this.locations = locations;
+ this.numberOfBlocklets = numberOfBlocklets;
+ //this.tableBlockInfo = tableBlockInfo;
+ this.version = version;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
index ee636b1..b138f18 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -24,21 +24,17 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
public class CarbonTableCacheModel {
- public CarbonTableIdentifier carbonTableIdentifier;
- public CarbonTablePath carbonTablePath;
+ public CarbonTableIdentifier carbonTableIdentifier;
+ public CarbonTablePath carbonTablePath;
- public TableInfo tableInfo;
- public CarbonTable carbonTable;
- public String[] segments;
+ public TableInfo tableInfo;
+ public CarbonTable carbonTable;
+ public String[] segments;
- public boolean isValid()
- {
- if(carbonTable != null
- && carbonTablePath != null
- && carbonTableIdentifier != null)
- return true;
- else
- return false;
- }
+ public boolean isValid() {
+ if (carbonTable != null && carbonTablePath != null && carbonTableIdentifier != null)
+ return true;
+ else return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
index a682b66..a2b0a8c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -22,41 +22,35 @@ import io.airlift.configuration.Config;
import javax.validation.constraints.NotNull;
public class CarbonTableConfig {
- //read from config
- private String dbPtah;
- private String tablePath;
- private String storePath;
-
- @NotNull
- public String getDbPtah() {
- return dbPtah;
- }
-
- @Config("carbondata-store")
- public CarbonTableConfig setDbPtah(String dbPtah) {
- this.dbPtah = dbPtah;
- return this;
- }
-
- @NotNull
- public String getTablePath() {
- return tablePath;
- }
-
- @Config("carbondata-store")
- public CarbonTableConfig setTablePath(String tablePath) {
- this.tablePath = tablePath;
- return this;
- }
-
- @NotNull
- public String getStorePath() {
- return storePath;
- }
-
- @Config("carbondata-store")
- public CarbonTableConfig setStorePath(String storePath) {
- this.storePath = storePath;
- return this;
- }
+ //read from config
+ private String dbPtah;
+ private String tablePath;
+ private String storePath;
+
+ @NotNull public String getDbPtah() {
+ return dbPtah;
+ }
+
+ @Config("carbondata-store") public CarbonTableConfig setDbPtah(String dbPtah) {
+ this.dbPtah = dbPtah;
+ return this;
+ }
+
+ @NotNull public String getTablePath() {
+ return tablePath;
+ }
+
+ @Config("carbondata-store") public CarbonTableConfig setTablePath(String tablePath) {
+ this.tablePath = tablePath;
+ return this;
+ }
+
+ @NotNull public String getStorePath() {
+ return storePath;
+ }
+
+ @Config("carbondata-store") public CarbonTableConfig setStorePath(String storePath) {
+ this.storePath = storePath;
+ return this;
+ }
}
[3/4] incubator-carbondata git commit: [CARBONDATA-826] Create
carbondata-connector for query carbon data in presto
Posted by ch...@apache.org.
[CARBONDATA-826] Create carbondata-connector for query carbon data in presto
fix guava version in pom
add readme for presto integration
add readme for presto integration
for travis ci
for travis ci
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/27123300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/27123300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/27123300
Branch: refs/heads/master
Commit: 27123300794228e36e6f4f5887b9087b174c1bb3
Parents: 08a6e81
Author: chenliang613 <ch...@huawei.com>
Authored: Mon Mar 27 16:40:20 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Wed Mar 29 14:34:26 2017 +0530
----------------------------------------------------------------------
integration/presto/README.md | 86 ++
integration/presto/pom.xml | 22 +-
.../presto/CarbondataColumnConstraint.java | 101 +-
.../presto/CarbondataColumnHandle.java | 205 ++--
.../carbondata/presto/CarbondataConnector.java | 87 +-
.../presto/CarbondataConnectorFactory.java | 119 +-
.../presto/CarbondataConnectorId.java | 49 +-
.../presto/CarbondataHandleResolver.java | 36 +-
.../carbondata/presto/CarbondataMetadata.java | 427 ++++---
.../carbondata/presto/CarbondataModule.java | 63 +-
.../carbondata/presto/CarbondataPlugin.java | 14 +-
.../presto/CarbondataRecordCursor.java | 190 ++-
.../carbondata/presto/CarbondataRecordSet.java | 110 +-
.../presto/CarbondataRecordSetProvider.java | 431 ++++---
.../carbondata/presto/CarbondataSplit.java | 110 +-
.../presto/CarbondataSplitManager.java | 449 ++++----
.../presto/CarbondataTableHandle.java | 75 +-
.../presto/CarbondataTableLayoutHandle.java | 79 +-
.../presto/CarbondataTransactionHandle.java | 6 +-
.../org/apache/carbondata/presto/Types.java | 19 +-
.../presto/impl/CarbonLocalInputSplit.java | 103 +-
.../presto/impl/CarbonTableCacheModel.java | 24 +-
.../presto/impl/CarbonTableConfig.java | 68 +-
.../presto/impl/CarbonTableReader.java | 1088 +++++++++---------
.../GrtLtFilterProcessorTestCase.scala | 18 +-
25 files changed, 1906 insertions(+), 2073 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/README.md
----------------------------------------------------------------------
diff --git a/integration/presto/README.md b/integration/presto/README.md
new file mode 100644
index 0000000..8a7cd13
--- /dev/null
+++ b/integration/presto/README.md
@@ -0,0 +1,86 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+Please follow the below steps to query carbondata in presto
+
+### Config presto server
+* Download presto server 0.166 : https://repo1.maven.org/maven2/com/facebook/presto/presto-server/
+* Finish configuration as per https://prestodb.io/docs/current/installation/deployment.html
+ for example:
+ ```
+ carbondata.properties:
+ connector.name=carbondata
+ carbondata-store=/Users/apple/DEMO/presto_test/data
+
+ config.properties:
+ coordinator=true
+ node-scheduler.include-coordinator=true
+ http-server.http.port=8086
+ query.max-memory=5GB
+ query.max-memory-per-node=1GB
+ discovery-server.enabled=true
+ discovery.uri=http://localhost:8086
+
+ jvm.config:
+ -server
+ -Xmx4G
+ -XX:+UseG1GC
+ -XX:G1HeapRegionSize=32M
+ -XX:+UseGCOverheadLimit
+ -XX:+ExplicitGCInvokesConcurrent
+ -XX:+HeapDumpOnOutOfMemoryError
+ -XX:OnOutOfMemoryError=kill -9 %p
+ -XX:+TraceClassLoading
+
+ log.properties:
+ com.facebook.presto=DEBUG
+ com.facebook.presto.server.PluginManager=DEBUG
+
+ node.properties:
+ node.environment=carbondata
+ node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
+ node.data-dir=/Users/apple/DEMO/presto_test/data
+ ```
+* config carbondata-connector for presto
+
+ First:compile carbondata-presto integration module
+ ```
+ $ git clone https://github.com/apache/incubator-carbondata
+ $ cd incubator-carbondata/integration/presto
+ $ mvn clean package
+ ```
+ Second:create one folder "carbondata" under ./presto-server-0.166/plugin
+ Third:copy all jar from ./incubator-carbondata/integration/presto/target/carbondata-presto-1.1.0-incubating-SNAPSHOT
+ to ./presto-server-0.166/plugin/carbondata
+
+### Generate CarbonData file
+
+Please refer to quick start : https://github.com/apache/incubator-carbondata/blob/master/docs/quick-start-guide.md
+
+### Query carbondata in CLI of presto
+* Download presto-cli-0.166-executable.jar
+
+* Start CLI:
+
+ ```
+ $ ./presto-cli-0.166-executable.jar --server localhost:8086 --catalog carbondata --schema default
+ ```
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 5493df1..15e31fa 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -28,6 +28,7 @@
<artifactId>carbondata-presto</artifactId>
<name>Apache CarbonData :: presto</name>
+ <packaging>presto-plugin</packaging>
<properties>
<dev.path>${basedir}/../../dev</dev.path>
@@ -68,7 +69,7 @@
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
<version>0.144</version>
- <scope>provided</scope>
+ <!--<scope>provided</scope>-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -81,14 +82,14 @@
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
<version>0.144</version>
- <scope>provided</scope>
+ <!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
<version>0.144</version>
- <scope>provided</scope>
+ <!--<scope>provided</scope>-->
</dependency>
<dependency>
@@ -115,7 +116,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>14.0.1</version>
+ <version>18.0</version>
</dependency>
<dependency>
@@ -129,7 +130,7 @@
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.166</version>
- <!--<scope>provided</scope>-->
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -168,7 +169,7 @@
<failIfNoTests>false</failIfNoTests>
</configuration>
</plugin>
- <!--
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
@@ -213,12 +214,19 @@
</plugin>
<plugin>
+ <groupId>io.takari.maven.plugins</groupId>
+ <artifactId>presto-maven-plugin</artifactId>
+ <version>0.1.12</version>
+ <extensions>true</extensions>
+ </plugin>
+
+ <plugin>
<groupId>pl.project13.maven</groupId>
<artifactId>git-commit-id-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
- </plugin>-->
+ </plugin>
</plugins>
</build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
index aad378e..0665aed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
@@ -30,75 +30,54 @@ import static com.google.common.base.Objects.toStringHelper;
import static java.util.Objects.requireNonNull;
public class CarbondataColumnConstraint {
- private final String name;
- private final boolean invertedindexed;
- private Optional<Domain> domain;
+ private final String name;
+ private final boolean invertedindexed;
+ private Optional<Domain> domain;
- @JsonCreator
- public CarbondataColumnConstraint(
- @JsonProperty("name") String name,
- @JsonProperty("domain") Optional<Domain> domain,
- @JsonProperty("invertedindexed") boolean invertedindexed)
- {
- this.name = requireNonNull(name, "name is null");
- this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
- this.domain = requireNonNull(domain, "domain is null");
- }
-
- @JsonProperty
- public boolean isInvertedindexed()
- {
- return invertedindexed;
- }
-
- @JsonProperty
- public String getName()
- {
- return name;
- }
+ @JsonCreator public CarbondataColumnConstraint(@JsonProperty("name") String name,
+ @JsonProperty("domain") Optional<Domain> domain,
+ @JsonProperty("invertedindexed") boolean invertedindexed) {
+ this.name = requireNonNull(name, "name is null");
+ this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
+ this.domain = requireNonNull(domain, "domain is null");
+ }
- @JsonProperty
- public Optional<Domain> getDomain()
- {
- return domain;
- }
+ @JsonProperty public boolean isInvertedindexed() {
+ return invertedindexed;
+ }
- @JsonSetter
- public void setDomain(Optional<Domain> domain)
- {
- this.domain = domain;
- }
+ @JsonProperty public String getName() {
+ return name;
+ }
- @Override
- public int hashCode()
- {
- return Objects.hash(name, domain, invertedindexed);
- }
+ @JsonProperty public Optional<Domain> getDomain() {
+ return domain;
+ }
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
+ @JsonSetter public void setDomain(Optional<Domain> domain) {
+ this.domain = domain;
+ }
- if ((obj == null) || (getClass() != obj.getClass())) {
- return false;
- }
+ @Override public int hashCode() {
+ return Objects.hash(name, domain, invertedindexed);
+ }
- CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
- return Objects.equals(this.name, other.name)
- && Objects.equals(this.domain, other.domain)
- && Objects.equals(this.invertedindexed, other.invertedindexed);
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
}
- @Override
- public String toString()
- {
- return toStringHelper(this)
- .add("name", this.name)
- .add("invertedindexed", this.invertedindexed)
- .add("domain", this.domain)
- .toString();
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
}
+
+ CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
+ return Objects.equals(this.name, other.name) && Objects.equals(this.domain, other.domain)
+ && Objects.equals(this.invertedindexed, other.invertedindexed);
+ }
+
+ @Override public String toString() {
+ return toStringHelper(this).add("name", this.name).add("invertedindexed", this.invertedindexed)
+ .add("domain", this.domain).toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index 2e3aef5..972a59c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -29,136 +29,111 @@ import java.util.Objects;
import static com.google.common.base.Objects.toStringHelper;
import static java.util.Objects.requireNonNull;
-public class CarbondataColumnHandle
- implements ColumnHandle
-{
- private final String connectorId;
- private final String columnName;
-
- public boolean isInvertedIndex() {
- return isInvertedIndex;
- }
+public class CarbondataColumnHandle implements ColumnHandle {
+ private final String connectorId;
+ private final String columnName;
- private final Type columnType;
- private final int ordinalPosition;
- private final int keyOrdinal;
- private final int columnGroupOrdinal;
+ public boolean isInvertedIndex() {
+ return isInvertedIndex;
+ }
- private final int columnGroupId;
- private final String columnUniqueId;
- private final boolean isInvertedIndex;
+ private final Type columnType;
+ private final int ordinalPosition;
+ private final int keyOrdinal;
+ private final int columnGroupOrdinal;
- public boolean isMeasure() {
- return isMeasure;
- }
+ private final int columnGroupId;
+ private final String columnUniqueId;
+ private final boolean isInvertedIndex;
- private final boolean isMeasure;
+ public boolean isMeasure() {
+ return isMeasure;
+ }
- public int getKeyOrdinal() {
- return keyOrdinal;
- }
+ private final boolean isMeasure;
- public int getColumnGroupOrdinal() {
- return columnGroupOrdinal;
- }
+ public int getKeyOrdinal() {
+ return keyOrdinal;
+ }
- public int getColumnGroupId() {
- return columnGroupId;
- }
+ public int getColumnGroupOrdinal() {
+ return columnGroupOrdinal;
+ }
- public String getColumnUniqueId() {
- return columnUniqueId;
- }
+ public int getColumnGroupId() {
+ return columnGroupId;
+ }
+
+ public String getColumnUniqueId() {
+ return columnUniqueId;
+ }
/* ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1 from tablename)
The columnhandle of clm3 : has ordinalposition = 3
*/
- @JsonCreator
- public CarbondataColumnHandle(
- @JsonProperty("connectorId") String connectorId,
- @JsonProperty("columnName") String columnName,
- @JsonProperty("columnType") Type columnType,
- @JsonProperty("ordinalPosition") int ordinalPosition,
- @JsonProperty("keyOrdinal") int keyOrdinal,
- @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
- @JsonProperty("isMeasure") boolean isMeasure,
- @JsonProperty("columnGroupId") int columnGroupId,
- @JsonProperty("columnUniqueId") String columnUniqueId,
- @JsonProperty("isInvertedIndex") boolean isInvertedIndex)
- {
- this.connectorId = requireNonNull(connectorId, "connectorId is null");
- this.columnName = requireNonNull(columnName, "columnName is null");
- this.columnType = requireNonNull(columnType, "columnType is null");
-
- this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
- this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
- this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
-
- this.isMeasure = isMeasure;
- this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
- this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
- this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
- }
-
- @JsonProperty
- public String getConnectorId()
- {
- return connectorId;
- }
-
- @JsonProperty
- public String getColumnName()
- {
- return columnName;
- }
-
- @JsonProperty
- public Type getColumnType()
- {
- return columnType;
- }
-
- @JsonProperty
- public int getOrdinalPosition()
- {
- return ordinalPosition;
+ @JsonCreator public CarbondataColumnHandle(@JsonProperty("connectorId") String connectorId,
+ @JsonProperty("columnName") String columnName, @JsonProperty("columnType") Type columnType,
+ @JsonProperty("ordinalPosition") int ordinalPosition,
+ @JsonProperty("keyOrdinal") int keyOrdinal,
+ @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
+ @JsonProperty("isMeasure") boolean isMeasure,
+ @JsonProperty("columnGroupId") int columnGroupId,
+ @JsonProperty("columnUniqueId") String columnUniqueId,
+ @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.columnName = requireNonNull(columnName, "columnName is null");
+ this.columnType = requireNonNull(columnType, "columnType is null");
+
+ this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
+ this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
+ this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
+
+ this.isMeasure = isMeasure;
+ this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
+ this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
+ this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+ }
+
+ @JsonProperty public String getConnectorId() {
+ return connectorId;
+ }
+
+ @JsonProperty public String getColumnName() {
+ return columnName;
+ }
+
+ @JsonProperty public Type getColumnType() {
+ return columnType;
+ }
+
+ @JsonProperty public int getOrdinalPosition() {
+ return ordinalPosition;
+ }
+
+ public ColumnMetadata getColumnMetadata() {
+ return new ColumnMetadata(columnName, columnType, null, false);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(connectorId, columnName);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
}
-
- public ColumnMetadata getColumnMetadata()
- {
- return new ColumnMetadata(columnName, columnType, null, false);
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
}
- @Override
- public int hashCode()
- {
- return Objects.hash(connectorId, columnName);
- }
+ CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
+ return Objects.equals(this.connectorId, other.connectorId) && Objects
+ .equals(this.columnName, other.columnName);
+ }
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
- if ((obj == null) || (getClass() != obj.getClass())) {
- return false;
- }
-
- CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
- return Objects.equals(this.connectorId, other.connectorId) &&
- Objects.equals(this.columnName, other.columnName);
- }
-
- @Override
- public String toString()
- {
- return toStringHelper(this)
- .add("connectorId", connectorId)
- .add("columnName", columnName)
- .add("columnType", columnType)
- .add("ordinalPosition", ordinalPosition)
- .toString();
- }
+ @Override public String toString() {
+ return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
+ .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
index 90b4944..0f1dbda 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -26,63 +26,50 @@ import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
-public class CarbondataConnector
- implements Connector
-{
+public class CarbondataConnector implements Connector {
- private static final Logger log = Logger.get(CarbondataConnector.class);
+ private static final Logger log = Logger.get(CarbondataConnector.class);
- private final LifeCycleManager lifeCycleManager;
- private final CarbondataMetadata metadata;
- private final ConnectorSplitManager splitManager;
- private final ConnectorRecordSetProvider recordSetProvider;
- private final ClassLoader classLoader;
+ private final LifeCycleManager lifeCycleManager;
+ private final CarbondataMetadata metadata;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorRecordSetProvider recordSetProvider;
+ private final ClassLoader classLoader;
+ public CarbondataConnector(LifeCycleManager lifeCycleManager, CarbondataMetadata metadata,
+ ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider,
+ ClassLoader classLoader) {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
- public CarbondataConnector(LifeCycleManager lifeCycleManager,
- CarbondataMetadata metadata,
- ConnectorSplitManager splitManager,
- ConnectorRecordSetProvider recordSetProvider,
- ClassLoader classLoader)
- {
- this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
- this.metadata = requireNonNull(metadata, "metadata is null");
- this.splitManager = requireNonNull(splitManager, "splitManager is null");
- this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
- this.classLoader = requireNonNull(classLoader, "classLoader is null");
- }
-
- @Override
- public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) {
- checkConnectorSupports(READ_COMMITTED, isolationLevel);
- return CarbondataTransactionHandle.INSTANCE;
- }
+ @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel,
+ boolean readOnly) {
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return CarbondataTransactionHandle.INSTANCE;
+ }
- @Override
- public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
- metadata.putClassLoader(classLoader);
- return metadata;
- }
+ @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+ metadata.putClassLoader(classLoader);
+ return metadata;
+ }
- @Override
- public ConnectorSplitManager getSplitManager() {
- return splitManager;
- }
+ @Override public ConnectorSplitManager getSplitManager() {
+ return splitManager;
+ }
- @Override
- public ConnectorRecordSetProvider getRecordSetProvider()
- {
- return recordSetProvider;
- }
+ @Override public ConnectorRecordSetProvider getRecordSetProvider() {
+ return recordSetProvider;
+ }
- @Override
- public final void shutdown()
- {
- try {
- lifeCycleManager.stop();
- }
- catch (Exception e) {
- log.error(e, "Error shutting down connector");
- }
+ @Override public final void shutdown() {
+ try {
+ lifeCycleManager.stop();
+ } catch (Exception e) {
+ log.error(e, "Error shutting down connector");
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 324699c..b146a2e 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -31,69 +31,60 @@ import java.util.Map;
import static java.util.Objects.requireNonNull;
-public class CarbondataConnectorFactory
- implements ConnectorFactory {
-
- private final String name;
- private final ClassLoader classLoader;
-
- public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader){
- this.name = connectorName;
- this.classLoader = requireNonNull(classLoader, "classLoader is null");
- }
-
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public ConnectorHandleResolver getHandleResolver() {
- return new CarbondataHandleResolver();
- }
-
- @Override
- public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) {
- requireNonNull(config, "config is null");
-
- try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
- Bootstrap app = new Bootstrap(new JsonModule(), new CarbondataModule(connectorId, context.getTypeManager()));
-
- Injector injector = app
- .strictConfig()
- .doNotInitializeLogging()
- .setRequiredConfigurationProperties(config)
- .initialize();
-
- LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
- CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
- //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
- ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
- ConnectorRecordSetProvider connectorRecordSet = injector.getInstance(ConnectorRecordSetProvider.class);
- //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
-
-
- //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
- //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
- //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
- //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
- //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
-
-
- return new CarbondataConnector(
- lifeCycleManager,
- metadata,
- new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
- connectorRecordSet,//new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
- classLoader
- //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
- //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
- //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
- );
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
+public class CarbondataConnectorFactory implements ConnectorFactory {
+
+ private final String name;
+ private final ClassLoader classLoader;
+
+ public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader) {
+ this.name = connectorName;
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+ @Override public String getName() {
+ return name;
+ }
+
+ @Override public ConnectorHandleResolver getHandleResolver() {
+ return new CarbondataHandleResolver();
+ }
+
+ @Override public Connector create(String connectorId, Map<String, String> config,
+ ConnectorContext context) {
+ requireNonNull(config, "config is null");
+
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ Bootstrap app = new Bootstrap(new JsonModule(),
+ new CarbondataModule(connectorId, context.getTypeManager()));
+
+ Injector injector =
+ app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
+ .initialize();
+
+ LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+ CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+ //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
+ ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+ ConnectorRecordSetProvider connectorRecordSet =
+ injector.getInstance(ConnectorRecordSetProvider.class);
+ //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
+
+ //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+ //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
+ //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+ //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
+ //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+
+ return new CarbondataConnector(lifeCycleManager, metadata,
+ new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
+ //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
+ classLoader
+ //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+ //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+ //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+ );
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
index 5aa72f1..b4ba1dd 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
@@ -23,39 +23,30 @@ import java.util.Objects;
import static java.util.Objects.requireNonNull;
-public class CarbondataConnectorId
-{
- private final String id;
-
- @Inject
- public CarbondataConnectorId(String id)
- {
- this.id = requireNonNull(id, "id is null");
- }
+public class CarbondataConnectorId {
+ private final String id;
- @Override
- public String toString()
- {
- return id;
- }
+ @Inject public CarbondataConnectorId(String id) {
+ this.id = requireNonNull(id, "id is null");
+ }
- @Override
- public int hashCode()
- {
- return Objects.hash(id);
- }
+ @Override public String toString() {
+ return id;
+ }
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
+ @Override public int hashCode() {
+ return Objects.hash(id);
+ }
- if ((obj == null) || (getClass() != obj.getClass())) {
- return false;
- }
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
- return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
}
+
+ return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
index 5918b46..7c65bfd 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
@@ -21,29 +21,23 @@ import com.facebook.presto.spi.*;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
public class CarbondataHandleResolver implements ConnectorHandleResolver {
- @Override
- public Class<? extends ConnectorTableHandle> getTableHandleClass() {
- return CarbondataTableHandle.class;
- }
+ @Override public Class<? extends ConnectorTableHandle> getTableHandleClass() {
+ return CarbondataTableHandle.class;
+ }
- @Override
- public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
- return CarbondataTableLayoutHandle.class;
- }
+ @Override public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
+ return CarbondataTableLayoutHandle.class;
+ }
- @Override
- public Class<? extends ColumnHandle> getColumnHandleClass() {
- return CarbondataColumnHandle.class;
- }
+ @Override public Class<? extends ColumnHandle> getColumnHandleClass() {
+ return CarbondataColumnHandle.class;
+ }
- @Override
- public Class<? extends ConnectorSplit> getSplitClass() {
- return CarbondataSplit.class;
- }
+ @Override public Class<? extends ConnectorSplit> getSplitClass() {
+ return CarbondataSplit.class;
+ }
- @Override
- public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
- {
- return CarbondataTransactionHandle.class;
- }
+ @Override public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass() {
+ return CarbondataTransactionHandle.class;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index 7fa96c3..524ce20 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -37,46 +37,36 @@ import static org.apache.carbondata.presto.Types.checkType;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
-public class CarbondataMetadata
- implements ConnectorMetadata
-{
- private final String connectorId;
- private CarbonTableReader carbonTableReader;
- private ClassLoader classLoader;
-
- private Map<String, ColumnHandle> columnHandleMap;
-
- @Inject
- public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader)
- {
- this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
- this.carbonTableReader = requireNonNull(reader, "client is null");
+public class CarbondataMetadata implements ConnectorMetadata {
+ private final String connectorId;
+ private CarbonTableReader carbonTableReader;
+ private ClassLoader classLoader;
+
+ private Map<String, ColumnHandle> columnHandleMap;
+
+ @Inject public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = requireNonNull(reader, "client is null");
+ }
+
+ public void putClassLoader(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
+ @Override public List<String> listSchemaNames(ConnectorSession session) {
+ return listSchemaNamesInternal();
+ }
+
+ public List<String> listSchemaNamesInternal() {
+ List<String> ret;
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ ret = carbonTableReader.getSchemaNames();
}
+ return ret;
+ }
-
- public void putClassLoader(ClassLoader classLoader)
- {
- this.classLoader = classLoader;
- }
-
-
- @Override
- public List<String> listSchemaNames(ConnectorSession session) {
- return listSchemaNamesInternal();
- }
-
-
- public List<String> listSchemaNamesInternal()
- {
- List<String> ret;
- try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
- ret = carbonTableReader.getSchemaNames();
- }
- return ret;
- }
-
- @Override
- public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
+ @Override
+ public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
/*List<SchemaTableName> all = carbonTableReader.getTableList();
if(schemaNameOrNull != null)
@@ -85,225 +75,206 @@ public class CarbondataMetadata
}
return all;*/
- List<String> schemaNames;
- if (schemaNameOrNull != null) {
- schemaNames = ImmutableList.of(schemaNameOrNull);
- }
- else {
- schemaNames = carbonTableReader.getSchemaNames();
- }
-
- ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
- for (String schemaName : schemaNames) {
- for (String tableName : carbonTableReader.getTableNames(schemaName)) {
- builder.add(new SchemaTableName(schemaName, tableName));
- }
- }
- return builder.build();
+ List<String> schemaNames;
+ if (schemaNameOrNull != null) {
+ schemaNames = ImmutableList.of(schemaNameOrNull);
+ } else {
+ schemaNames = carbonTableReader.getSchemaNames();
}
- @Override
- public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) {
- requireNonNull(prefix, "SchemaTablePrefix is null");
-
- ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
- for (SchemaTableName tableName : listTables(session, prefix)) {
- ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
- if (tableMetadata != null) {
- columns.put(tableName, tableMetadata.getColumns());
- }
- }
- return columns.build();
+ ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
+ for (String schemaName : schemaNames) {
+ for (String tableName : carbonTableReader.getTableNames(schemaName)) {
+ builder.add(new SchemaTableName(schemaName, tableName));
+ }
}
-
- //if prefix is null. return all tables
- //if prefix is not null, just return this table
- private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
- {
- if (prefix.getSchemaName() == null) {
- return listTables(session, prefix.getSchemaName());
- }
- return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+ return builder.build();
+ }
+
+ @Override
+ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session,
+ SchemaTablePrefix prefix) {
+ requireNonNull(prefix, "SchemaTablePrefix is null");
+
+ ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
+ for (SchemaTableName tableName : listTables(session, prefix)) {
+ ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
+ if (tableMetadata != null) {
+ columns.put(tableName, tableMetadata.getColumns());
+ }
}
-
- private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
- {
- if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
- return null;
- }
-
- CarbonTable cb = carbonTableReader.getTable(tableName);
- if (cb == null) {
- return null;
- }
-
- List<ColumnMetadata> spiCols = new LinkedList<>();
- List<CarbonDimension> cols = cb.getDimensionByTableName(tableName.getTableName());
- for(CarbonDimension col : cols)
- {
- //show columns command will return these data
- Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
- ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
- spiCols.add(spiCol);
- }
-
- List<CarbonMeasure> mcols = cb.getMeasureByTableName(tableName.getTableName());
- for(CarbonMeasure mcol : mcols)
- {
- Type spiType = CarbondataType2SpiMapper(mcol.getColumnSchema().getDataType());
- ColumnMetadata spiCol = new ColumnMetadata(mcol.getColumnSchema().getColumnName(), spiType);
- spiCols.add(spiCol);
- }
-
- //\u5c01\u88c5carbonTable
- return new ConnectorTableMetadata(tableName, spiCols);
+ return columns.build();
+ }
+
+ //if prefix is null. return all tables
+ //if prefix is not null, just return this table
+ private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix) {
+ if (prefix.getSchemaName() == null) {
+ return listTables(session, prefix.getSchemaName());
}
+ return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+ }
- @Override
- public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) {
-
- CarbondataTableHandle handle = checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
- checkArgument(handle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");
-
- String schemaName = handle.getSchemaTableName().getSchemaName();
- if (!listSchemaNamesInternal().contains(schemaName)) {
- throw new SchemaNotFoundException(schemaName);
- }
-
- //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
- CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
- if (cb == null) {
- throw new TableNotFoundException(handle.getSchemaTableName());
- }
-
- ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
- int index = 0;
- String tableName = handle.getSchemaTableName().getTableName();
- for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
- ColumnSchema cs = column.getColumnSchema();
-
- int complex = column.getComplexTypeOrdinal();
- column.getNumberOfChild();
- column.getListOfChildDimensions();
-
- Type spiType = CarbondataType2SpiMapper(cs.getDataType());
- columnHandles.put(
- cs.getColumnName(),
- new CarbondataColumnHandle(
- connectorId,
- cs.getColumnName(),
- spiType,
- index,
- column.getKeyOrdinal(),
- column.getColumnGroupOrdinal(),
- false,
- cs.getColumnGroupId(),
- cs.getColumnUniqueId(),
- cs.isUseInvertedIndex()));
- index++;
- }
+ private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
+ if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+ return null;
+ }
- for(CarbonMeasure measure : cb.getMeasureByTableName(tableName)){
- ColumnSchema cs = measure.getColumnSchema();
-
- Type spiType = CarbondataType2SpiMapper(cs.getDataType());
- columnHandles.put(
- cs.getColumnName(),
- new CarbondataColumnHandle(
- connectorId,
- cs.getColumnName(),
- spiType,
- index,
- measure.getOrdinal(),
- cs.getColumnGroupId(),
- true,
- cs.getColumnGroupId(),
- cs.getColumnUniqueId(),
- cs.isUseInvertedIndex()));
- index++;
- }
+ CarbonTable cb = carbonTableReader.getTable(tableName);
+ if (cb == null) {
+ return null;
+ }
- //should i cache it?
- columnHandleMap = columnHandles.build();
+ List<ColumnMetadata> spiCols = new LinkedList<>();
+ List<CarbonDimension> cols = cb.getDimensionByTableName(tableName.getTableName());
+ for (CarbonDimension col : cols) {
+ //show columns command will return these data
+ Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+ ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
+ spiCols.add(spiCol);
+ }
- return columnHandleMap;
+ List<CarbonMeasure> mcols = cb.getMeasureByTableName(tableName.getTableName());
+ for (CarbonMeasure mcol : mcols) {
+ Type spiType = CarbondataType2SpiMapper(mcol.getColumnSchema().getDataType());
+ ColumnMetadata spiCol = new ColumnMetadata(mcol.getColumnSchema().getColumnName(), spiType);
+ spiCols.add(spiCol);
}
- @Override
- public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+ //\u5c01\u88c5carbonTable
+ return new ConnectorTableMetadata(tableName, spiCols);
+ }
- checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
- return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle").getColumnMetadata();
- }
+ @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
+ ConnectorTableHandle tableHandle) {
- @Override
- public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
- //check tablename is valid
- //schema is exist
- //tables is exist
+ CarbondataTableHandle handle =
+ checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+ checkArgument(handle.getConnectorId().equals(connectorId),
+ "tableHandle is not for this connector");
- //CarbondataTable get from jar
- return new CarbondataTableHandle(connectorId, tableName);
+ String schemaName = handle.getSchemaTableName().getSchemaName();
+ if (!listSchemaNamesInternal().contains(schemaName)) {
+ throw new SchemaNotFoundException(schemaName);
}
- @Override
- public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) {
- CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
- ConnectorTableLayout layout = new ConnectorTableLayout(new CarbondataTableLayoutHandle(handle,constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
- return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+ //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
+ CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
+ if (cb == null) {
+ throw new TableNotFoundException(handle.getSchemaTableName());
}
- @Override
- public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) {
- return new ConnectorTableLayout(handle);
+ ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
+ int index = 0;
+ String tableName = handle.getSchemaTableName().getTableName();
+ for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
+ ColumnSchema cs = column.getColumnSchema();
+
+ int complex = column.getComplexTypeOrdinal();
+ column.getNumberOfChild();
+ column.getListOfChildDimensions();
+
+ Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+ columnHandles.put(cs.getColumnName(),
+ new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, index,
+ column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
+ cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+ index++;
}
- @Override
- public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) {
- return getTableMetadataInternal(table);
- }
+ for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
+ ColumnSchema cs = measure.getColumnSchema();
- public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table)
- {
- CarbondataTableHandle carbondataTableHandle = checkType(table, CarbondataTableHandle.class, "table");
- checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");
- return getTableMetadata(carbondataTableHandle.getSchemaTableName());
+ Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+ columnHandles.put(cs.getColumnName(),
+ new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, index,
+ measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
+ cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+ index++;
}
-
- public static Type CarbondataType2SpiMapper(DataType colType)
- {
- switch (colType)
- {
- case BOOLEAN:
- return BooleanType.BOOLEAN;
- case SHORT:
- return SmallintType.SMALLINT;
- case INT:
- return IntegerType.INTEGER;
- case LONG:
- return BigintType.BIGINT;
- case FLOAT:
- case DOUBLE:
- return DoubleType.DOUBLE;
-
- case DECIMAL:
- return DecimalType.createDecimalType();
- case STRING:
- return VarcharType.VARCHAR;
- case DATE:
- return DateType.DATE;
- case TIMESTAMP:
- return TimestampType.TIMESTAMP;
+ //should i cache it?
+ columnHandleMap = columnHandles.build();
+
+ return columnHandleMap;
+ }
+
+ @Override public ColumnMetadata getColumnMetadata(ConnectorSession session,
+ ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+
+ checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+ return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle")
+ .getColumnMetadata();
+ }
+
+ @Override
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+ //check tablename is valid
+ //schema is exist
+ //tables is exist
+
+ //CarbondataTable get from jar
+ return new CarbondataTableHandle(connectorId, tableName);
+ }
+
+ @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
+ ConnectorTableHandle table, Constraint<ColumnHandle> constraint,
+ Optional<Set<ColumnHandle>> desiredColumns) {
+ CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
+ ConnectorTableLayout layout = new ConnectorTableLayout(
+ new CarbondataTableLayoutHandle(handle, constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
+ return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+ }
+
+ @Override public ConnectorTableLayout getTableLayout(ConnectorSession session,
+ ConnectorTableLayoutHandle handle) {
+ return new ConnectorTableLayout(handle);
+ }
+
+ @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session,
+ ConnectorTableHandle table) {
+ return getTableMetadataInternal(table);
+ }
+
+ public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table) {
+ CarbondataTableHandle carbondataTableHandle =
+ checkType(table, CarbondataTableHandle.class, "table");
+ checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId),
+ "tableHandle is not for this connector");
+ return getTableMetadata(carbondataTableHandle.getSchemaTableName());
+ }
+
+ public static Type CarbondataType2SpiMapper(DataType colType) {
+ switch (colType) {
+ case BOOLEAN:
+ return BooleanType.BOOLEAN;
+ case SHORT:
+ return SmallintType.SMALLINT;
+ case INT:
+ return IntegerType.INTEGER;
+ case LONG:
+ return BigintType.BIGINT;
+ case FLOAT:
+ case DOUBLE:
+ return DoubleType.DOUBLE;
+
+ case DECIMAL:
+ return DecimalType.createDecimalType();
+ case STRING:
+ return VarcharType.VARCHAR;
+ case DATE:
+ return DateType.DATE;
+ case TIMESTAMP:
+ return TimestampType.TIMESTAMP;
/*case DataType.MAP:
case DataType.ARRAY:
case DataType.STRUCT:
case DataType.NULL:*/
- default:
- return VarcharType.VARCHAR;
- }
+ default:
+ return VarcharType.VARCHAR;
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
index b329678..0baa64a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -38,46 +38,39 @@ import static java.util.Objects.requireNonNull;
public class CarbondataModule implements Module {
- private final String connectorId;
- private final TypeManager typeManager;
+ private final String connectorId;
+ private final TypeManager typeManager;
- public CarbondataModule(String connectorId, TypeManager typeManager)
- {
- this.connectorId = requireNonNull(connectorId, "connector id is null");
- this.typeManager = requireNonNull(typeManager, "typeManager is null");
- }
+ public CarbondataModule(String connectorId, TypeManager typeManager) {
+ this.connectorId = requireNonNull(connectorId, "connector id is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
- @Override
- public void configure(Binder binder) {
- binder.bind(TypeManager.class).toInstance(typeManager);
+ @Override public void configure(Binder binder) {
+ binder.bind(TypeManager.class).toInstance(typeManager);
- binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
- binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
- binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
- binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
- binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
- binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
- configBinder(binder).bindConfig(CarbonTableConfig.class);
- }
+ binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
+ binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class)
+ .in(Scopes.SINGLETON);
+ binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(CarbonTableConfig.class);
+ }
- public static final class TypeDeserializer
- extends FromStringDeserializer<Type>
- {
- private final TypeManager typeManager;
+ public static final class TypeDeserializer extends FromStringDeserializer<Type> {
+ private final TypeManager typeManager;
- @Inject
- public TypeDeserializer(TypeManager typeManager)
- {
- super(Type.class);
- this.typeManager = requireNonNull(typeManager, "typeManager is null");
- }
+ @Inject public TypeDeserializer(TypeManager typeManager) {
+ super(Type.class);
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
- @Override
- protected Type _deserialize(String value, DeserializationContext context)
- {
- Type type = typeManager.getType(parseTypeSignature(value));
- checkArgument(type != null, "Unknown type %s", value);
- return type;
- }
+ @Override protected Type _deserialize(String value, DeserializationContext context) {
+ Type type = typeManager.getType(parseTypeSignature(value));
+ checkArgument(type != null, "Unknown type %s", value);
+ return type;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java
index 3bcfe4f..191f13b 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java
@@ -24,13 +24,11 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
public class CarbondataPlugin implements Plugin {
- @Override
- public Iterable<ConnectorFactory> getConnectorFactories()
- {
- return ImmutableList.of(new CarbondataConnectorFactory("carbondata", getClassLoader()));
- }
+ @Override public Iterable<ConnectorFactory> getConnectorFactories() {
+ return ImmutableList.of(new CarbondataConnectorFactory("carbondata", getClassLoader()));
+ }
- private static ClassLoader getClassLoader() {
- return FileFactory.class.getClassLoader();
- }
+ private static ClassLoader getClassLoader() {
+ return FileFactory.class.getClassLoader();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index 7d65efd..3314ac4 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -38,119 +38,107 @@ import static com.google.common.base.Preconditions.checkState;
public class CarbondataRecordCursor implements RecordCursor {
- private static final Logger log = Logger.get(CarbondataRecordCursor.class);
- private final List<CarbondataColumnHandle> columnHandles;
-
- private List<String> fields;
- private CarbondataSplit split;
- private CarbonIterator<Object[]> rowCursor;
- private CarbonReadSupport<Object[]> readSupport;
-
- private long totalBytes;
- private long nanoStart;
- private long nanoEnd;
-
- public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport, CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles, CarbondataSplit split) {
- this.rowCursor = carbonIterator;
- this.columnHandles = columnHandles;
- this.readSupport = readSupport;
- this.totalBytes = 0;
- }
-
-
- @Override
- public long getTotalBytes() {
- return totalBytes;
- }
-
- @Override
- public long getCompletedBytes() {
- return totalBytes;
- }
-
- @Override
- public long getReadTimeNanos() {
- return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
- }
-
- @Override
- public Type getType(int field) {
-
- checkArgument(field < columnHandles.size(), "Invalid field index");
- return columnHandles.get(field).getColumnType();
- }
+ private static final Logger log = Logger.get(CarbondataRecordCursor.class);
+ private final List<CarbondataColumnHandle> columnHandles;
- @Override
- public boolean advanceNextPosition() {
+ private List<String> fields;
+ private CarbondataSplit split;
+ private CarbonIterator<Object[]> rowCursor;
+ private CarbonReadSupport<Object[]> readSupport;
- if (nanoStart == 0) {
- nanoStart = System.nanoTime();
- }
+ private long totalBytes;
+ private long nanoStart;
+ private long nanoEnd;
- if(rowCursor.hasNext())
- {
- fields = Stream.of(readSupport.readRow(rowCursor.next())).map(a -> a.toString()).collect(Collectors.toList());
+ public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport,
+ CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles,
+ CarbondataSplit split) {
+ this.rowCursor = carbonIterator;
+ this.columnHandles = columnHandles;
+ this.readSupport = readSupport;
+ this.totalBytes = 0;
+ }
- totalBytes += fields.size();
- return true;
- }
- return false;
- }
-
- @Override
- public boolean getBoolean(int field) {
- checkFieldType(field, BOOLEAN);
- return Boolean.parseBoolean(getFieldValue(field));
- }
+ @Override public long getTotalBytes() {
+ return totalBytes;
+ }
- @Override
- public long getLong(int field) {
- String timeStr = getFieldValue(field);
- Long milliSec = 0L;
+ @Override public long getCompletedBytes() {
+ return totalBytes;
+ }
- //suppose the
- return Math.round(Double.parseDouble(getFieldValue(field)));
- }
+ @Override public long getReadTimeNanos() {
+ return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
+ }
- @Override
- public double getDouble(int field) {
- checkFieldType(field, DOUBLE);
- return Double.parseDouble(getFieldValue(field));
- }
+ @Override public Type getType(int field) {
- @Override
- public Slice getSlice(int field) {
- checkFieldType(field, VARCHAR);
- return Slices.utf8Slice(getFieldValue(field));
- }
+ checkArgument(field < columnHandles.size(), "Invalid field index");
+ return columnHandles.get(field).getColumnType();
+ }
- @Override
- public Object getObject(int field) {
- return null;
- }
-
- @Override
- public boolean isNull(int field) {
- checkArgument(field < columnHandles.size(), "Invalid field index");
- return Strings.isNullOrEmpty(getFieldValue(field));
- }
-
- String getFieldValue(int field)
- {
- checkState(fields != null, "Cursor has not been advanced yet");
- return fields.get(field);
- }
+ @Override public boolean advanceNextPosition() {
- private void checkFieldType(int field, Type expected)
- {
- Type actual = getType(field);
- checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field, expected, actual);
+ if (nanoStart == 0) {
+ nanoStart = System.nanoTime();
}
- @Override
- public void close() {
- nanoEnd = System.nanoTime();
+ if (rowCursor.hasNext()) {
+ fields = Stream.of(readSupport.readRow(rowCursor.next())).map(a -> a.toString())
+ .collect(Collectors.toList());
- //todo delete cache from readSupport
+ totalBytes += fields.size();
+ return true;
}
+ return false;
+ }
+
+ @Override public boolean getBoolean(int field) {
+ checkFieldType(field, BOOLEAN);
+ return Boolean.parseBoolean(getFieldValue(field));
+ }
+
+ @Override public long getLong(int field) {
+ String timeStr = getFieldValue(field);
+ Long milliSec = 0L;
+
+ //suppose the
+ return Math.round(Double.parseDouble(getFieldValue(field)));
+ }
+
+ @Override public double getDouble(int field) {
+ checkFieldType(field, DOUBLE);
+ return Double.parseDouble(getFieldValue(field));
+ }
+
+ @Override public Slice getSlice(int field) {
+ checkFieldType(field, VARCHAR);
+ return Slices.utf8Slice(getFieldValue(field));
+ }
+
+ @Override public Object getObject(int field) {
+ return null;
+ }
+
+ @Override public boolean isNull(int field) {
+ checkArgument(field < columnHandles.size(), "Invalid field index");
+ return Strings.isNullOrEmpty(getFieldValue(field));
+ }
+
+ String getFieldValue(int field) {
+ checkState(fields != null, "Cursor has not been advanced yet");
+ return fields.get(field);
+ }
+
+ private void checkFieldType(int field, Type expected) {
+ Type actual = getType(field);
+ checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field,
+ expected, actual);
+ }
+
+ @Override public void close() {
+ nanoEnd = System.nanoTime();
+
+ //todo delete cache from readSupport
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index 737d2f1..af37728 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -43,74 +43,68 @@ import static org.apache.carbondata.presto.Types.checkType;
public class CarbondataRecordSet implements RecordSet {
- private CarbonTable carbonTable;
- private TupleDomain<ColumnHandle> originalConstraint;
- private Expression carbonConstraint;
- private List<CarbondataColumnConstraint> rebuildConstraints;
- private QueryModel queryModel;
- private CarbondataSplit split;
- private List<CarbondataColumnHandle> columns;
- private QueryExecutor queryExecutor;
+ private CarbonTable carbonTable;
+ private TupleDomain<ColumnHandle> originalConstraint;
+ private Expression carbonConstraint;
+ private List<CarbondataColumnConstraint> rebuildConstraints;
+ private QueryModel queryModel;
+ private CarbondataSplit split;
+ private List<CarbondataColumnHandle> columns;
+ private QueryExecutor queryExecutor;
- private CarbonReadSupport<Object[]> readSupport;
+ private CarbonReadSupport<Object[]> readSupport;
- public CarbondataRecordSet(
- CarbonTable carbonTable,
- ConnectorSession session,
- ConnectorSplit split,
- List<CarbondataColumnHandle> columns,
- QueryModel queryModel){
- this.carbonTable = carbonTable;
- this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
- this.originalConstraint = this.split.getConstraints();
- this.rebuildConstraints = this.split.getRebuildConstraints();
- this.queryModel = queryModel;
- this.columns = columns;
- this.readSupport = new DictionaryDecodeReadSupport();
- }
+ public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
+ ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) {
+ this.carbonTable = carbonTable;
+ this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
+ this.originalConstraint = this.split.getConstraints();
+ this.rebuildConstraints = this.split.getRebuildConstraints();
+ this.queryModel = queryModel;
+ this.columns = columns;
+ this.readSupport = new DictionaryDecodeReadSupport();
+ }
- //todo support later
- private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
- return null;
- }
+ //todo support later
+ private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
+ return null;
+ }
- @Override
- public List<Type> getColumnTypes() {
- return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
- }
+ @Override public List<Type> getColumnTypes() {
+ return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
+ }
- @Override
- public RecordCursor cursor() {
- List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+ @Override public RecordCursor cursor() {
+ List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
- //tableBlockInfoList.add(split.getLocalInputSplit().getTableBlockInfo());
+ //tableBlockInfoList.add(split.getLocalInputSplit().getTableBlockInfo());
/*BlockletInfos blockletInfos = new BlockletInfos(split.getLocalInputSplit().getNumberOfBlocklets(), 0,
split.getLocalInputSplit().getNumberOfBlocklets());*/
- tableBlockInfoList.add(
- new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
- split.getLocalInputSplit().getStart(),
- split.getLocalInputSplit().getSegmentId(),
- split.getLocalInputSplit().getLocations().toArray(new String[0]),
- split.getLocalInputSplit().getLength(),
- //blockletInfos,
- ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion())));
- queryModel.setTableBlockInfos(tableBlockInfoList);
+ tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
+ split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(),
+ split.getLocalInputSplit().getLocations().toArray(new String[0]),
+ split.getLocalInputSplit().getLength(),
+ //blockletInfos,
+ ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion())));
+ queryModel.setTableBlockInfos(tableBlockInfoList);
- queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
- //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
- try {
- readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
- CarbonIterator<Object[]> carbonIterator = new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
- RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
- return rc;
- } catch (QueryExecutionException e) {
- //throw new InterruptedException(e.getMessage());
- System.out.println(e.getMessage());
- } catch(Exception ex) {
- System.out.println(ex.toString());
- }
- return null;
+ //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
+ try {
+ readSupport
+ .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
+ CarbonIterator<Object[]> carbonIterator =
+ new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+ RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
+ return rc;
+ } catch (QueryExecutionException e) {
+ //throw new InterruptedException(e.getMessage());
+ System.out.println(e.getMessage());
+ } catch (Exception ex) {
+ System.out.println(ex.toString());
}
+ return null;
+ }
}