You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/03/29 09:07:06 UTC

[1/4] incubator-carbondata git commit: [CARBONDATA-826] Create carbondata-connector for query carbon data in presto

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 08a6e8161 -> bd5114097


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 45c905a..1d0eeed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -83,618 +83,614 @@ import java.util.stream.Stream;
 import static java.util.Objects.requireNonNull;
 
 public class CarbonTableReader {
-    //CarbonTableReader will be a facade of these utils
-    //[
-    // 1:CarbonMetadata,(logic table)
-    // 2:FileFactory, (physic table file)
-    // 3:CarbonCommonFactory, (offer some )
-    // 4:DictionaryFactory, (parse dictionary util)
-    //]
-
-    private CarbonTableConfig config;
-    private List<SchemaTableName> tableList;
-    private CarbonFile dbStore;
-    private FileFactory.FileType fileType;
-
-    private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;//as a cache for Carbon reader
-
-    @Inject
-    public CarbonTableReader(CarbonTableConfig config){
-        this.config = requireNonNull(config, "CarbonTableConfig is null");
-        this.cc = new ConcurrentHashMap<>();
-    }
-
-    public CarbonTableCacheModel getCarbonCache(SchemaTableName table){
-        if(!cc.containsKey(table))//for worker node to initalize carbon metastore
-        {
-            try(ThreadContextClassLoader ignored = new ThreadContextClassLoader(FileFactory.class.getClassLoader())) {
-                if(dbStore == null) {
-                    fileType = FileFactory.getFileType(config.getStorePath());
-                    try{
-                        dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
-                    }catch (Exception ex){
-                        throw new RuntimeException(ex);
-                    }
-                }
-            }
-            updateSchemaTables();
-            parseCarbonMetadata(table);
+  //CarbonTableReader will be a facade of these utils
+  //[
+  // 1:CarbonMetadata,(logic table)
+  // 2:FileFactory, (physic table file)
+  // 3:CarbonCommonFactory, (offer some )
+  // 4:DictionaryFactory, (parse dictionary util)
+  //]
+
+  private CarbonTableConfig config;
+  private List<SchemaTableName> tableList;
+  private CarbonFile dbStore;
+  private FileFactory.FileType fileType;
+
+  private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
+      //as a cache for Carbon reader
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = requireNonNull(config, "CarbonTableConfig is null");
+    this.cc = new ConcurrentHashMap<>();
+  }
+
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
+    if (!cc.containsKey(table))//for worker node to initalize carbon metastore
+    {
+      try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
+          FileFactory.class.getClassLoader())) {
+        if (dbStore == null) {
+          fileType = FileFactory.getFileType(config.getStorePath());
+          try {
+            dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+          } catch (Exception ex) {
+            throw new RuntimeException(ex);
+          }
         }
-
-        if(cc.containsKey(table))
-            return cc.get(table);
-        else
-            return null;//need to reload?*/
+      }
+      updateSchemaTables();
+      parseCarbonMetadata(table);
     }
 
-    public List<String> getSchemaNames() {
-        return updateSchemaList();
-    }
-
-    //default PathFilter
-    private static final PathFilter DefaultFilter = new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-            return CarbonTablePath.isCarbonDataFile(path.getName());
-        }
-    };
-
-    public boolean updateDbStore(){
-        if(dbStore == null) {
-            fileType = FileFactory.getFileType(config.getStorePath());
-            try{
-                dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
-            }catch (Exception ex){
-                throw new RuntimeException(ex);
-            }
-        }
-        return true;
-    }
+    if (cc.containsKey(table)) return cc.get(table);
+    else return null;//need to reload?*/
+  }
 
-    public List<String> updateSchemaList() {
-        updateDbStore();
+  public List<String> getSchemaNames() {
+    return updateSchemaList();
+  }
 
-        if(dbStore != null){
-            List<String> scs = Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
-            return scs;
-        }
-        else
-            return ImmutableList.of();
+  //default PathFilter
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
     }
-
-
-    public Set<String> getTableNames(String schema) {
-        requireNonNull(schema, "schema is null");
-        return updateTableList(schema);
+  };
+
+  public boolean updateDbStore() {
+    if (dbStore == null) {
+      fileType = FileFactory.getFileType(config.getStorePath());
+      try {
+        dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
     }
-
-    public Set<String> updateTableList(String dbName){
-        List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName())).collect(Collectors.toList());
-        if(schema.size() > 0)
-        {
-            return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()).collect(Collectors.toSet());
-        }
-        else
-            return ImmutableSet.of();
+    return true;
+  }
+
+  public List<String> updateSchemaList() {
+    updateDbStore();
+
+    if (dbStore != null) {
+      List<String> scs =
+          Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+      return scs;
+    } else return ImmutableList.of();
+  }
+
+  public Set<String> getTableNames(String schema) {
+    requireNonNull(schema, "schema is null");
+    return updateTableList(schema);
+  }
+
+  public Set<String> updateTableList(String dbName) {
+    List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+        .collect(Collectors.toList());
+    if (schema.size() > 0) {
+      return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
+          .collect(Collectors.toSet());
+    } else return ImmutableSet.of();
+  }
+
+  public CarbonTable getTable(SchemaTableName schemaTableName) {
+    try {
+      updateSchemaTables();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
 
-    public CarbonTable getTable(SchemaTableName schemaTableName) {
-        try {
-            updateSchemaTables();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+    requireNonNull(schemaTableName, "schemaTableName is null");
+    CarbonTable table = loadTableMetadata(schemaTableName);
 
-        requireNonNull(schemaTableName, "schemaTableName is null");
-        CarbonTable table = loadTableMetadata(schemaTableName);
+    return table;
+  }
 
-        return table;
+  public void updateSchemaTables() {
+    //update logic determine later
+    if (dbStore == null) {
+      updateSchemaList();
     }
 
-
-    public void updateSchemaTables()
-    {
-        //update logic determine later
-        if(dbStore == null)
-        {
-            updateSchemaList();
-        }
-
-        tableList = new LinkedList<>();
-        for(CarbonFile db: dbStore.listFiles())
-        {
-            if(!db.getName().endsWith(".mdt")) {
-                for (CarbonFile table : db.listFiles()) {
-                    tableList.add(new SchemaTableName(db.getName(), table.getName()));
-                }
-            }
+    tableList = new LinkedList<>();
+    for (CarbonFile db : dbStore.listFiles()) {
+      if (!db.getName().endsWith(".mdt")) {
+        for (CarbonFile table : db.listFiles()) {
+          tableList.add(new SchemaTableName(db.getName(), table.getName()));
         }
+      }
     }
+  }
 
-    private CarbonTable loadTableMetadata(SchemaTableName schemaTableName)
-    {
-        for (SchemaTableName table : tableList) {
-            if (!table.equals(schemaTableName))
-                continue;
+  private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) {
+    for (SchemaTableName table : tableList) {
+      if (!table.equals(schemaTableName)) continue;
 
-            return parseCarbonMetadata(table);
-        }
-        return null;
+      return parseCarbonMetadata(table);
     }
-
-    /**
-     * parse carbon metadata into cc(CarbonTableReader cache)
-     **/
-    public CarbonTable parseCarbonMetadata(SchemaTableName table)
-    {
-        CarbonTable result = null;
-        try {
-            //\u8fd9\u4e2a\u5e94\u8be5\u653e\u5728StoreFactory
-            CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
-            if(cache.isValid())
-                return cache.carbonTable;
-
-            //Step1: get table meta path, load carbon table param
-            String storePath = config.getStorePath();
-            cache.carbonTableIdentifier = new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString());
-            cache.carbonTablePath = PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
-            cc.put(table, cache);
-
-            //Step2: check file existed? read schema file
-            ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
-                public TBase create() {
-                    return new org.apache.carbondata.format.TableInfo();
-                }
-            };
-            ThriftReader thriftReader =
-                    new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
-            thriftReader.open();
-            org.apache.carbondata.format.TableInfo tableInfo =
-                    (org.apache.carbondata.format.TableInfo) thriftReader.read();
-            thriftReader.close();
-
-            //Format Level\u7684TableInfo\uff0c \u9700\u8981\u8f6c\u6362\u6210Code Level\u7684TableInfo
-            SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-            TableInfo wrapperTableInfo = schemaConverter
-                    .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
-                            storePath);
-            wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
-            //\u52a0\u8f7d\u5230CarbonMetadata\u4ed3\u5e93
-            CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-
-            cache.tableInfo = wrapperTableInfo;
-            cache.carbonTable = CarbonMetadata.getInstance().getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
-            result = cache.carbonTable;
-        }catch (Exception ex) {
-            throw new RuntimeException(ex);
+    return null;
+  }
+
+  /**
+   * parse carbon metadata into cc(CarbonTableReader cache)
+   **/
+  public CarbonTable parseCarbonMetadata(SchemaTableName table) {
+    CarbonTable result = null;
+    try {
+      //\u8fd9\u4e2a\u5e94\u8be5\u653e\u5728StoreFactory
+      CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
+      if (cache.isValid()) return cache.carbonTable;
+
+      //Step1: get table meta path, load carbon table param
+      String storePath = config.getStorePath();
+      cache.carbonTableIdentifier =
+          new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
+              UUID.randomUUID().toString());
+      cache.carbonTablePath =
+          PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
+      cc.put(table, cache);
+
+      //Step2: check file existed? read schema file
+      ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+        public TBase create() {
+          return new org.apache.carbondata.format.TableInfo();
         }
-
-        return result;
+      };
+      ThriftReader thriftReader =
+          new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
+      thriftReader.open();
+      org.apache.carbondata.format.TableInfo tableInfo =
+          (org.apache.carbondata.format.TableInfo) thriftReader.read();
+      thriftReader.close();
+
+      //Format Level\u7684TableInfo\uff0c \u9700\u8981\u8f6c\u6362\u6210Code Level\u7684TableInfo
+      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+      TableInfo wrapperTableInfo = schemaConverter
+          .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+              storePath);
+      wrapperTableInfo.setMetaDataFilepath(
+          CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
+      //\u52a0\u8f7d\u5230CarbonMetadata\u4ed3\u5e93
+      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+
+      cache.tableInfo = wrapperTableInfo;
+      cache.carbonTable = CarbonMetadata.getInstance()
+          .getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+      result = cache.carbonTable;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
     }
 
-    public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, Expression filters) throws Exception {
+    return result;
+  }
 
-        //\u5904\u7406filter, \u4e0b\u63a8filter\uff0c\u5c06\u5e94\u7528\u5728Segment\u7684\u7d22\u5f15\u4e0a
-        FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
+  public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters) throws Exception {
 
-        AbsoluteTableIdentifier absoluteTableIdentifier = tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
-        CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
-        List<String> invalidSegments = new ArrayList<>();
-        List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+    //\u5904\u7406filter, \u4e0b\u63a8filter\uff0c\u5c06\u5e94\u7528\u5728Segment\u7684\u7d22\u5f15\u4e0a
+    FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
 
-        // get all valid segments and set them into the configuration
-        SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier);
-        SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-        SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager.getValidAndInvalidSegments();
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
+    CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
+    List<String> invalidSegments = new ArrayList<>();
+    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
 
-        tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
-        if (segments.getValidSegments().size() == 0) {
-            return new ArrayList<>(0);
-        }
+    // get all valid segments and set them into the configuration
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+        segmentStatusManager.getValidAndInvalidSegments();
 
-        // remove entry in the segment index if there are invalid segments
-        invalidSegments.addAll(segments.getInvalidSegments());
-        for (String invalidSegmentId : invalidSegments) {
-            invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
-        }
-        if (invalidSegments.size() > 0) {
-            List<TableSegmentUniqueIdentifier> invalidSegmentsIds = new ArrayList<>(invalidSegments.size());
-            for(String segId: invalidSegments) {
-                invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
-            }
-            cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
-        }
-
-        // get filter for segment
-        CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
-        FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
-
-        List<CarbonLocalInputSplit> result = new ArrayList<>();
-        //for each segment fetch blocks matching filter in Driver BTree
-        for (String segmentNo : tableCacheModel.segments) {
-            try{
-                List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient, updateStatusManager);
-                for (DataRefNode dataRefNode : dataRefNodes) {
-                    BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
-                    TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
-
-                    if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), updateStatusManager)) {
-                        continue;
-                    }
-                    result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
-                            tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
-                            Arrays.asList(tableBlockInfo.getLocations()), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
-                            tableBlockInfo.getVersion().number()));
-                }
-            }catch (Exception ex){
-                throw new RuntimeException(ex);
-            }
-        }
-        cacheClient.close();
-        return result;
+    tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
+    if (segments.getValidSegments().size() == 0) {
+      return new ArrayList<>(0);
     }
 
-    /**
-     * get data blocks of given segment
-     */
-    private List<DataRefNode> getDataBlocksOfSegment(FilterExpressionProcessor filterExpressionProcessor,
-                                                     AbsoluteTableIdentifier absoluteTableIdentifier,
-                                                     CarbonTablePath tablePath,
-                                                     FilterResolverIntf resolver,
-                                                     String segmentId,
-                                                     CacheClient cacheClient,
-                                                     SegmentUpdateStatusManager updateStatusManager) throws IOException {
-        //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
-        //QueryStatistic statistic = new QueryStatistic();
-
-        //\u8bfb\u53d6Segment \u5185\u90e8\u7684Index
-        Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
-                getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
-                        updateStatusManager);
-
-        List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
-
-        if (null != segmentIndexMap) {
-            // build result
-            for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
-                List<DataRefNode> filterredBlocks;
-                // if no filter is given get all blocks from Btree Index
-                if (null == resolver) {
-                    filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-                } else {
-                    //ignore filter
-                    //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-
-                    // apply filter and get matching blocks
-                    filterredBlocks = filterExpressionProcessor
-                            .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
-                                    absoluteTableIdentifier);
-                }
-                resultFilterredBlocks.addAll(filterredBlocks);
-            }
-        }
-        //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
-        //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
-        return resultFilterredBlocks;
+    // remove entry in the segment index if there are invalid segments
+    invalidSegments.addAll(segments.getInvalidSegments());
+    for (String invalidSegmentId : invalidSegments) {
+      invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+    }
+    if (invalidSegments.size() > 0) {
+      List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+          new ArrayList<>(invalidSegments.size());
+      for (String segId : invalidSegments) {
+        invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
+      }
+      cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
     }
 
-    private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
-                                    UpdateVO updateDetails) {
-        if (null != updateDetails.getLatestUpdateTimestamp()
-                && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
-                .getRefreshedTimeStamp()) {
-            return true;
+    // get filter for segment
+    CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
+    FilterResolverIntf filterInterface = CarbonInputFormatUtil
+        .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
+
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    //for each segment fetch blocks matching filter in Driver BTree
+    for (String segmentNo : tableCacheModel.segments) {
+      try {
+        List<DataRefNode> dataRefNodes =
+            getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,
+                tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient,
+                updateStatusManager);
+        for (DataRefNode dataRefNode : dataRefNodes) {
+          BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
+          TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
+
+          if (CarbonUtil.isInvalidTableBlock(tableBlockInfo,
+              updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()),
+              updateStatusManager)) {
+            continue;
+          }
+          result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
+              tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
+              Arrays.asList(tableBlockInfo.getLocations()),
+              tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
+              tableBlockInfo.getVersion().number()));
         }
-        return false;
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
     }
-
-    private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
-                                                                                                AbsoluteTableIdentifier absoluteTableIdentifier,
-                                                                                                CarbonTablePath tablePath,
-                                                                                                String segmentId,
-                                                                                                CacheClient cacheClient,
-                                                                                                SegmentUpdateStatusManager updateStatusManager) throws IOException {
-        Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
-        SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
-        boolean isSegmentUpdated = false;
-        Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
-        TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
-                new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
-        segmentTaskIndexWrapper =
-                cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
-        UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
-        if (null != segmentTaskIndexWrapper) {
-            segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
-            if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
-                taskKeys = segmentIndexMap.keySet();
-                isSegmentUpdated = true;
-            }
+    cacheClient.close();
+    return result;
+  }
+
+  /**
+   * get data blocks of given segment
+   */
+  private List<DataRefNode> getDataBlocksOfSegment(
+      FilterExpressionProcessor filterExpressionProcessor,
+      AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath,
+      FilterResolverIntf resolver, String segmentId, CacheClient cacheClient,
+      SegmentUpdateStatusManager updateStatusManager) throws IOException {
+    //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
+    //QueryStatistic statistic = new QueryStatistic();
+
+    //\u8bfb\u53d6Segment \u5185\u90e8\u7684Index
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+        getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
+            updateStatusManager);
+
+    List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
+
+    if (null != segmentIndexMap) {
+      // build result
+      for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
+        List<DataRefNode> filterredBlocks;
+        // if no filter is given get all blocks from Btree Index
+        if (null == resolver) {
+          filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+        } else {
+          //ignore filter
+          //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+
+          // apply filter and get matching blocks
+          filterredBlocks = filterExpressionProcessor
+              .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
+                  absoluteTableIdentifier);
         }
+        resultFilterredBlocks.addAll(filterredBlocks);
+      }
+    }
+    //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+    //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
+    return resultFilterredBlocks;
+  }
+
+  private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
+      UpdateVO updateDetails) {
+    if (null != updateDetails.getLatestUpdateTimestamp()
+        && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
+        .getRefreshedTimeStamp()) {
+      return true;
+    }
+    return false;
+  }
+
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
+      AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId,
+      CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+    boolean isSegmentUpdated = false;
+    Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
+    TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+        new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+    segmentTaskIndexWrapper =
+        cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
+    UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
+    if (null != segmentTaskIndexWrapper) {
+      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+      if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
+        taskKeys = segmentIndexMap.keySet();
+        isSegmentUpdated = true;
+      }
+    }
 
-        // if segment tree is not loaded, load the segment tree
-        if (segmentIndexMap == null || isSegmentUpdated) {
-
-            List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
-            List<String> segs = new ArrayList<>();
-            segs.add(segmentId);
+    // if segment tree is not loaded, load the segment tree
+    if (segmentIndexMap == null || isSegmentUpdated) {
 
-            FileSystem fs = getFileStatusOfSegments(new String[]{segmentId}, tablePath, fileStatusList);
-            List<InputSplit> splits = getSplit(fileStatusList, fs);
+      List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+      List<String> segs = new ArrayList<>();
+      segs.add(segmentId);
 
-            List<FileSplit> carbonSplits = new ArrayList<>();
-            for (InputSplit inputSplit : splits) {
-                FileSplit fileSplit = (FileSplit) inputSplit;
-                String segId = CarbonTablePath.DataPathUtil.getSegmentId(fileSplit.getPath().toString());//\u8fd9\u91cc\u7684seperator\u5e94\u8be5\u600e\u4e48\u52a0\uff1f\uff1f
-                if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
-                    continue;
-                }
-                carbonSplits.add(fileSplit);
-            }
+      FileSystem fs =
+          getFileStatusOfSegments(new String[] { segmentId }, tablePath, fileStatusList);
+      List<InputSplit> splits = getSplit(fileStatusList, fs);
 
-            List<TableBlockInfo> tableBlockInfoList  = new ArrayList<>();
-            for (FileSplit inputSplit : carbonSplits) {
-                if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId)) {
-
-                    BlockletInfos blockletInfos = new BlockletInfos(0, 0, 0);//this level we do not need blocklet info!!!! Is this a trick?
-                    tableBlockInfoList.add(
-                            new TableBlockInfo(inputSplit.getPath().toString(),
-                                    inputSplit.getStart(),
-                                    segmentId,
-                                    inputSplit.getLocations(),
-                                    inputSplit.getLength(),
-                                    blockletInfos,
-                                    ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION),
-                                    null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//\u8fd9\u91cc\u7684null\u662f\u5426\u4f1a\u5f02\u5e38\uff1f
-                }
-            }
-
-            Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
-            segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
-            // get Btree blocks for given segment
-            tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
-            tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
-            segmentTaskIndexWrapper =
-                    cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
-            segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+      List<FileSplit> carbonSplits = new ArrayList<>();
+      for (InputSplit inputSplit : splits) {
+        FileSplit fileSplit = (FileSplit) inputSplit;
+        String segId = CarbonTablePath.DataPathUtil
+            .getSegmentId(fileSplit.getPath().toString());//\u8fd9\u91cc\u7684seperator\u5e94\u8be5\u600e\u4e48\u52a0\uff1f\uff1f
+        if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
+          continue;
         }
-        return segmentIndexMap;
+        carbonSplits.add(fileSplit);
+      }
+
+      List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+      for (FileSplit inputSplit : carbonSplits) {
+        if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails,
+            updateStatusManager, segmentId)) {
+
+          BlockletInfos blockletInfos = new BlockletInfos(0, 0,
+              0);//this level we do not need blocklet info!!!! Is this a trick?
+          tableBlockInfoList.add(
+              new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), segmentId,
+                  inputSplit.getLocations(), inputSplit.getLength(), blockletInfos,
+                  ColumnarFormatVersion
+                      .valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION), null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//\u8fd9\u91cc\u7684null\u662f\u5426\u4f1a\u5f02\u5e38\uff1f
+        }
+      }
+
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
+      segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+      // get Btree blocks for given segment
+      tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
+      tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
+      segmentTaskIndexWrapper =
+          cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
+      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
     }
+    return segmentIndexMap;
+  }
+
+  private boolean isValidBlockBasedOnUpdateDetails(
+      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
+      UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
+    String taskID = null;
+    if (null != carbonInputSplit) {
+      if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
+        return false;
+      }
 
-    private boolean isValidBlockBasedOnUpdateDetails(
-            Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
-            UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
-        String taskID = null;
-        if (null != carbonInputSplit) {
-            if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
-                return false;
-            }
-
-            if (null == taskKeys) {
-                return true;
+      if (null == taskKeys) {
+        return true;
+      }
+
+      taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+      String bucketNo =
+          CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
+
+      SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
+          new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
+
+      String blockTimestamp = carbonInputSplit.getPath().getName()
+          .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
+              carbonInputSplit.getPath().getName().lastIndexOf('.'));
+      if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
+          && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
+        if (!taskKeys.contains(taskBucketHolder)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)
+      throws IOException {
+
+    Iterator split = fileStatusList.iterator();
+
+    List<InputSplit> splits = new ArrayList<>();
+
+    while (true) {
+      while (true) {
+        while (split.hasNext()) {
+          FileStatus file = (FileStatus) split.next();
+          Path path = file.getPath();
+          long length = file.getLen();
+          if (length != 0L) {
+            BlockLocation[] blkLocations;
+            if (file instanceof LocatedFileStatus) {
+              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+            } else {
+              blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
             }
 
-            taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
-            String bucketNo =
-                    CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
-
-            SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
-                    new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
-
-            String blockTimestamp = carbonInputSplit.getPath().getName()
-                    .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
-                            carbonInputSplit.getPath().getName().lastIndexOf('.'));
-            if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
-                    && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
-                if (!taskKeys.contains(taskBucketHolder)) {
-                    return true;
-                }
+            if (this.isSplitable()) {
+              long blockSize1 = file.getBlockSize();
+              long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
+
+              long bytesRemaining;
+              int blkIndex;
+              for (
+                  bytesRemaining = length;
+                  (double) bytesRemaining / (double) splitSize > 1.1D;
+                  bytesRemaining -= splitSize) {
+                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize,
+                    blkLocations[blkIndex].getHosts()));
+              }
+
+              if (bytesRemaining != 0L) {
+                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining,
+                    blkLocations[blkIndex].getHosts()));
+              }
+            } else {
+              splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+                  blkLocations[0].getHosts()));
             }
+          } else {
+            splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+                new String[0]));
+          }
         }
-        return false;
+        return splits;
+      }
     }
 
-    private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)  throws IOException  {
-
-        Iterator split = fileStatusList.iterator();
-
-        List<InputSplit> splits = new ArrayList<>();
-
-        while (true)
-        {
-            while (true)
-            {
-                while(split.hasNext()) {
-                    FileStatus file = (FileStatus) split.next();
-                    Path path = file.getPath();
-                    long length = file.getLen();
-                    if (length != 0L) {
-                        BlockLocation[] blkLocations;
-                        if (file instanceof LocatedFileStatus) {
-                            blkLocations = ((LocatedFileStatus) file).getBlockLocations();
-                        } else {
-                            blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
-                        }
-
-                        if (this.isSplitable()) {
-                            long blockSize1 = file.getBlockSize();
-                            long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
-
-                            long bytesRemaining;
-                            int blkIndex;
-                            for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) {
-                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
-                                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
-                            }
-
-                            if (bytesRemaining != 0L) {
-                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
-                                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));
-                            }
-                        }
-                        else
-                        {
-                            splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, blkLocations[0].getHosts()));
-                        }
-                    }
-                    else {
-                        splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, new String[0]));
-                    }
-                }
-                return splits;
-            }
-        }
+  }
 
-    }
+  private String[] getValidPartitions() {
+    //TODO: has to Identify partitions by partition pruning
+    return new String[] { "0" };
+  }
 
-    private String[] getValidPartitions() {
-        //TODO: has to Identify partitions by partition pruning
-        return new String[] { "0" };
+  private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath,
+      List<FileStatus> result) throws IOException {
+    String[] partitionsToConsider = getValidPartitions();
+    if (partitionsToConsider.length == 0) {
+      throw new IOException("No partitions/data found");
     }
 
-    private FileSystem getFileStatusOfSegments(String[] segmentsToConsider,
-                                               CarbonTablePath tablePath,
-                                               List<FileStatus> result) throws IOException {
-        String[] partitionsToConsider = getValidPartitions();
-        if (partitionsToConsider.length == 0) {
-            throw new IOException("No partitions/data found");
-        }
-
-        FileSystem fs = null;
+    FileSystem fs = null;
 
-        //PathFilter inputFilter = getDataFileFilter(job);
+    //PathFilter inputFilter = getDataFileFilter(job);
 
-        // get tokens for all the required FileSystem for table path
+    // get tokens for all the required FileSystem for table path
         /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
                 job.getConfiguration());*/
 
-        //get all data files of valid partitions and segments
-        for (int i = 0; i < partitionsToConsider.length; ++i) {
-            String partition = partitionsToConsider[i];
-
-            for (int j = 0; j < segmentsToConsider.length; ++j) {
-                String segmentId = segmentsToConsider[j];
-                Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
-
-                try{
-                    Configuration conf = new Configuration();
-                    fs = segmentPath.getFileSystem(conf);
-                    //fs.initialize(segmentPath.toUri(), conf);
-
-                    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
-                    while (iter.hasNext()) {
-                        LocatedFileStatus stat = iter.next();
-                        //if(stat.getPath().toString().contains("carbondata"))//\u53c2\u770bcarbondata\u7684carbonInputFilter\u7684\u5b9e\u73b0
-                        if (DefaultFilter.accept(stat.getPath()))
-                        {
-                            if (stat.isDirectory()) {
-                                addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
-                            } else {
-                                result.add(stat);
-                            }
-                        }
-                    }
-                }catch (Exception ex){
-                    System.out.println(ex.toString());
-                }
-            }
-        }
-        return fs;
-    }
-
-    protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException {
-        RemoteIterator iter = fs.listLocatedStatus(path);
-
-        while(iter.hasNext()) {
-            LocatedFileStatus stat = (LocatedFileStatus)iter.next();
-            if(inputFilter.accept(stat.getPath())) {
-                if(stat.isDirectory()) {
-                    this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
-                } else {
-                    result.add(stat);
-                }
-            }
-        }
-
-    }
+    //get all data files of valid partitions and segments
+    for (int i = 0; i < partitionsToConsider.length; ++i) {
+      String partition = partitionsToConsider[i];
 
-    /**
-     * get data blocks of given btree
-     */
-    private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
-        List<DataRefNode> blocks = new LinkedList<DataRefNode>();
-        SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
+      for (int j = 0; j < segmentsToConsider.length; ++j) {
+        String segmentId = segmentsToConsider[j];
+        Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
 
         try {
-            IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
-            IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
-
-            // Add all blocks of btree into result
-            DataRefNodeFinder blockFinder =
-                    new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
-            DataRefNode startBlock =
-                    blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
-            DataRefNode endBlock =
-                    blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
-            while (startBlock != endBlock) {
-                blocks.add(startBlock);
-                startBlock = startBlock.getNextDataRefNode();
+          Configuration conf = new Configuration();
+          fs = segmentPath.getFileSystem(conf);
+          //fs.initialize(segmentPath.toUri(), conf);
+
+          RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
+          while (iter.hasNext()) {
+            LocatedFileStatus stat = iter.next();
+            //if(stat.getPath().toString().contains("carbondata"))//\u53c2\u770bcarbondata\u7684carbonInputFilter\u7684\u5b9e\u73b0
+            if (DefaultFilter.accept(stat.getPath())) {
+              if (stat.isDirectory()) {
+                addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
+              } else {
+                result.add(stat);
+              }
             }
-            blocks.add(endBlock);
-
-        } catch (KeyGenException e) {
-            System.out.println("Could not generate start key" + e.getMessage());
+          }
+        } catch (Exception ex) {
+          System.out.println(ex.toString());
         }
-        return blocks;
+      }
     }
-
-    private boolean isSplitable() {
-        try {
-            // Don't split the file if it is local file system
-            if(this.fileType == FileFactory.FileType.LOCAL)
-            {
-                return false;
-            }
-        } catch (Exception e) {
-            return true;
+    return fs;
+  }
+
+  protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path,
+      PathFilter inputFilter) throws IOException {
+    RemoteIterator iter = fs.listLocatedStatus(path);
+
+    while (iter.hasNext()) {
+      LocatedFileStatus stat = (LocatedFileStatus) iter.next();
+      if (inputFilter.accept(stat.getPath())) {
+        if (stat.isDirectory()) {
+          this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+        } else {
+          result.add(stat);
         }
-        return true;
+      }
     }
 
-    private long computeSplitSize(long blockSize, long minSize,
-                                  long maxSize) {
-        return Math.max(minSize, Math.min(maxSize, blockSize));
+  }
+
+  /**
+   * get data blocks of given btree
+   */
+  private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
+    List<DataRefNode> blocks = new LinkedList<DataRefNode>();
+    SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
+
+    try {
+      IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+      IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+
+      // Add all blocks of btree into result
+      DataRefNodeFinder blockFinder =
+          new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
+      DataRefNode startBlock =
+          blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
+      DataRefNode endBlock =
+          blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
+      while (startBlock != endBlock) {
+        blocks.add(startBlock);
+        startBlock = startBlock.getNextDataRefNode();
+      }
+      blocks.add(endBlock);
+
+    } catch (KeyGenException e) {
+      System.out.println("Could not generate start key" + e.getMessage());
     }
+    return blocks;
+  }
 
-    private FileSplit makeSplit(Path file, long start, long length,
-                                String[] hosts) {
-        return new FileSplit(file, start, length, hosts);
+  private boolean isSplitable() {
+    try {
+      // Don't split the file if it is local file system
+      if (this.fileType == FileFactory.FileType.LOCAL) {
+        return false;
+      }
+    } catch (Exception e) {
+      return true;
     }
-
-    private int getBlockIndex(BlockLocation[] blkLocations,
-                              long offset) {
-        for (int i = 0 ; i < blkLocations.length; i++) {
-            // is the offset inside this block?
-            if ((blkLocations[i].getOffset() <= offset) &&
-                    (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
-                return i;
-            }
-        }
-        BlockLocation last = blkLocations[blkLocations.length -1];
-        long fileLength = last.getOffset() + last.getLength() -1;
-        throw new IllegalArgumentException("Offset " + offset +
-                " is outside of file (0.." +
-                fileLength + ")");
+    return true;
+  }
+
+  private long computeSplitSize(long blockSize, long minSize, long maxSize) {
+    return Math.max(minSize, Math.min(maxSize, blockSize));
+  }
+
+  private FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
+    return new FileSplit(file, start, length, hosts);
+  }
+
+  private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+    for (int i = 0; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset) && (offset
+          < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+        return i;
+      }
     }
-
-
-    /**
-     * get total number of rows. for count(*)
-     *
-     * @throws IOException
-     * @throws IndexBuilderException
-     */
-    public long getRowCount() throws IOException, IndexBuilderException {
-        long rowCount = 0;
+    BlockLocation last = blkLocations[blkLocations.length - 1];
+    long fileLength = last.getOffset() + last.getLength() - 1;
+    throw new IllegalArgumentException("Offset " + offset +
+        " is outside of file (0.." +
+        fileLength + ")");
+  }
+
+  /**
+   * get total number of rows. for count(*)
+   *
+   * @throws IOException
+   * @throws IndexBuilderException
+   */
+  public long getRowCount() throws IOException, IndexBuilderException {
+    long rowCount = 0;
         /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
 
         // no of core to load the blocks in driver
@@ -733,6 +729,6 @@ public class CarbonTableReader {
         } catch (InterruptedException | ExecutionException e) {
             throw new IndexBuilderException(e);
         }*/
-        return rowCount;
-    }
+    return rowCount;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala
index cba2562..25d5cd5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/GrtLtFilterProcessorTestCase.scala
@@ -32,7 +32,7 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
     sql("drop table if exists a12")
-    sql("drop table if exists a12_allnull")
+    sql("drop table if exists a12_all_null")
     sql("drop table if exists a12_no_null")
     sql("drop table if exists Test_Boundary1")
 
@@ -41,7 +41,7 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
         "dob timestamp,comm decimal(4,2),desc string) stored by 'org.apache.carbondata.format'"
     )
     sql(
-      "create table a12_allnull(empid String,ename String,sal double,deptno int,mgr string,gender" +
+      "create table a12_all_null(empid String,ename String,sal double,deptno int,mgr string,gender" +
         " string," +
         "dob timestamp,comm decimal(4,2),desc string) stored by 'org.apache.carbondata.format'"
     )
@@ -62,7 +62,7 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
     testData = s"$resourcesPath/filter/emp2allnull.csv"
 
     sql(
-      s"""LOAD DATA LOCAL INPATH '$testData' into table a12_allnull OPTIONS('DELIMITER'=',',
+      s"""LOAD DATA LOCAL INPATH '$testData' into table a12_all_null OPTIONS('DELIMITER'=',',
          'QUOTECHAR'='"','FILEHEADER'='empid,ename,sal,deptno,mgr,gender,dob,comm,desc')"""
         .stripMargin
     )
@@ -115,34 +115,34 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("Less Than Filter all null") {
     checkAnswer(
-      sql("select count(empid) from a12_allnull where dob < '2014-07-01 12:07:28'"),
+      sql("select count(empid) from a12_all_null where dob < '2014-07-01 12:07:28'"),
       Seq(Row(0))
     )
   }
 
   test("Les Than equal Filter all null") {
     checkAnswer(
-      sql("select count (empid) from a12_allnull where dob <= '2014-07-01 12:07:28'"),
+      sql("select count (empid) from a12_all_null where dob <= '2014-07-01 12:07:28'"),
       Seq(Row(0))
     )
   }
 
   test("Greater Than Filter all null") {
     checkAnswer(
-      sql("select count (empid) from a12_allnull where dob > '2014-07-01 12:07:28'"),
+      sql("select count (empid) from a12_all_null where dob > '2014-07-01 12:07:28'"),
       Seq(Row(0))
     )
   }
 
   test("Greater Than equal to Filter all null") {
     checkAnswer(
-      sql("select count (empid) from a12_allnull where dob >= '2014-07-01 12:07:28'"),
+      sql("select count (empid) from a12_all_null where dob >= '2014-07-01 12:07:28'"),
       Seq(Row(0))
     )
   }
 //  test("In condition With improper format query regarding Null filter") {
 //    checkAnswer(
-//      sql("select empid from a12_allnull " + "where empid not in ('china',NULL)"),
+//      sql("select empid from a12_all_null " + "where empid not in ('china',NULL)"),
 //      Seq()
 //    )
 //  }
@@ -179,7 +179,7 @@ class GrtLtFilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
 
   override def afterAll {
     sql("drop table a12")
-    sql("drop table if exists a12_allnull")
+    sql("drop table if exists a12_all_null")
     sql("drop table if exists a12_no_null")
     sql("drop table if exists Test_Boundary1")
     CarbonProperties.getInstance()


[4/4] incubator-carbondata git commit: [CARBONDATA-826] Create carbondata-connector of presto This closes #704

Posted by ch...@apache.org.
[CARBONDATA-826] Create carbondata-connector of presto This closes #704


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bd511409
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bd511409
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bd511409

Branch: refs/heads/master
Commit: bd511409796721f8ef8d5541604c49d6a277a6a0
Parents: 08a6e81 2712330
Author: chenliang613 <ch...@huawei.com>
Authored: Wed Mar 29 14:36:42 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Wed Mar 29 14:36:42 2017 +0530

----------------------------------------------------------------------
 integration/presto/README.md                    |   86 ++
 integration/presto/pom.xml                      |   22 +-
 .../presto/CarbondataColumnConstraint.java      |  101 +-
 .../presto/CarbondataColumnHandle.java          |  205 ++--
 .../carbondata/presto/CarbondataConnector.java  |   87 +-
 .../presto/CarbondataConnectorFactory.java      |  119 +-
 .../presto/CarbondataConnectorId.java           |   49 +-
 .../presto/CarbondataHandleResolver.java        |   36 +-
 .../carbondata/presto/CarbondataMetadata.java   |  427 ++++---
 .../carbondata/presto/CarbondataModule.java     |   63 +-
 .../carbondata/presto/CarbondataPlugin.java     |   14 +-
 .../presto/CarbondataRecordCursor.java          |  190 ++-
 .../carbondata/presto/CarbondataRecordSet.java  |  110 +-
 .../presto/CarbondataRecordSetProvider.java     |  431 ++++---
 .../carbondata/presto/CarbondataSplit.java      |  110 +-
 .../presto/CarbondataSplitManager.java          |  449 ++++----
 .../presto/CarbondataTableHandle.java           |   75 +-
 .../presto/CarbondataTableLayoutHandle.java     |   79 +-
 .../presto/CarbondataTransactionHandle.java     |    6 +-
 .../org/apache/carbondata/presto/Types.java     |   19 +-
 .../presto/impl/CarbonLocalInputSplit.java      |  103 +-
 .../presto/impl/CarbonTableCacheModel.java      |   24 +-
 .../presto/impl/CarbonTableConfig.java          |   68 +-
 .../presto/impl/CarbonTableReader.java          | 1088 +++++++++---------
 .../GrtLtFilterProcessorTestCase.scala          |   18 +-
 25 files changed, 1906 insertions(+), 2073 deletions(-)
----------------------------------------------------------------------



[2/4] incubator-carbondata git commit: [CARBONDATA-826] Create carbondata-connector for query carbon data in presto

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 95895fc..85c53ad 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -56,241 +56,218 @@ import static java.util.Objects.requireNonNull;
 
 public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
 
-    private final String connectorId;
-    private final CarbonTableReader carbonTableReader;
-
-    @Inject
-    public CarbondataRecordSetProvider(
-            CarbondataConnectorId connectorId,
-            CarbonTableReader reader)
-    {
-        //this.config = requireNonNull(config, "config is null");
-        //this.connector = requireNonNull(connector, "connector is null");
-        this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
-        this.carbonTableReader = reader;
+  private final String connectorId;
+  private final CarbonTableReader carbonTableReader;
+
+  @Inject
+  public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+    //this.config = requireNonNull(config, "config is null");
+    //this.connector = requireNonNull(connector, "connector is null");
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = reader;
+  }
+
+  @Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
+    requireNonNull(split, "split is null");
+    requireNonNull(columns, "columns is null");
+
+    // Convert split
+    CarbondataSplit cdSplit =
+        checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
+    checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+
+    // Convert all columns handles
+    ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+    for (ColumnHandle handle : columns) {
+      handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
     }
 
-    @Override
-    public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
-        requireNonNull(split, "split is null");
-        requireNonNull(columns, "columns is null");
-
-        // Convert split
-        CarbondataSplit cdSplit = checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
-        checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
-
-        // Convert all columns handles
-        ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
-        for (ColumnHandle handle : columns) {
-            handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
-        }
-
-        // Build column projection(check the column order)
-        String targetCols = "";
-        for(ColumnHandle col : columns){
-            targetCols += ((CarbondataColumnHandle)col).getColumnName() + ",";
-        }
-        targetCols = targetCols.substring(0, targetCols.length() -1 );
-        //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
-
-        CarbonTableCacheModel tableCacheModel = carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
-        checkNotNull(tableCacheModel, "tableCacheModel should not be null");
-        checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
-        checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
-
-        // Build Query Model
-        CarbonTable targetTable = tableCacheModel.carbonTable;
-        CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
-        QueryModel queryModel = QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
-
-        // Push down filter
-        fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
-
-        // Return new record set
-        return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit, handles.build(), queryModel);
+    // Build column projection(check the column order)
+    String targetCols = "";
+    for (ColumnHandle col : columns) {
+      targetCols += ((CarbondataColumnHandle) col).getColumnName() + ",";
     }
-
-    // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
-    private void fillFilter2QueryModel(QueryModel queryModel, TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
-
-        //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
-
-        //Build Predicate Expression
-        ImmutableList.Builder<Expression> filters = ImmutableList.builder();
-
-        Domain domain = null;
-
-        for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
-
-            // Build ColumnExpresstion for Expresstion(Carbondata)
-            CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
-            Type type = cdch.getColumnType();
-
-            DataType coltype = Spi2CarbondataTypeMapper(type);
-            Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
-
-            domain = originalConstraint.getDomains().get().get(c);
-            checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
-
-            if (domain.getValues().isNone()) {
-                //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
-                //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
-                //new Expression()
-            }
-
-            if (domain.getValues().isAll()) {
-                //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
-                //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
-            }
-
-            List<Object> singleValues = new ArrayList<>();
-            List<Expression> rangeFilter = new ArrayList<>();
-            for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
-                checkState(!range.isAll()); // Already checked
-                if (range.isSingleValue()) {
-                    singleValues.add(range.getLow().getValue());
-                }
-                else
-                {
-                    List<String> rangeConjuncts = new ArrayList<>();
-                    if (!range.getLow().isLowerUnbounded()) {
-                        Object value = ConvertDataByType(range.getLow().getValue(), type);
-                        switch (range.getLow().getBound()) {
-                            case ABOVE:
-                                if (type == TimestampType.TIMESTAMP) {
-                                    //todo not now
-                                } else {
-                                    GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
-                                    //greater.setRangeExpression(true);
-                                    rangeFilter.add(greater);
-                                }
-                                break;
-                            case EXACTLY:
-                                GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
-                                //greater.setRangeExpression(true);
-                                rangeFilter.add(greater);
-                                break;
-                            case BELOW:
-                                throw new IllegalArgumentException("Low marker should never use BELOW bound");
-                            default:
-                                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
-                        }
-                    }
-                    if (!range.getHigh().isUpperUnbounded()) {
-                        Object value = ConvertDataByType(range.getHigh().getValue(), type);
-                        switch (range.getHigh().getBound()) {
-                            case ABOVE:
-                                throw new IllegalArgumentException("High marker should never use ABOVE bound");
-                            case EXACTLY:
-                                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
-                                //less.setRangeExpression(true);
-                                rangeFilter.add(less);
-                                break;
-                            case BELOW:
-                                LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
-                                //less2.setRangeExpression(true);
-                                rangeFilter.add(less2);
-                                break;
-                            default:
-                                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
-                        }
-                    }
+    targetCols = targetCols.substring(0, targetCols.length() - 1);
+    //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
+
+    CarbonTableCacheModel tableCacheModel =
+        carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+    checkNotNull(tableCacheModel, "tableCacheModel should not be null");
+    checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
+    checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
+
+    // Build Query Model
+    CarbonTable targetTable = tableCacheModel.carbonTable;
+    CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
+    QueryModel queryModel =
+        QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
+
+    // Push down filter
+    fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+
+    // Return new record set
+    return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit,
+        handles.build(), queryModel);
+  }
+
+  // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+  private void fillFilter2QueryModel(QueryModel queryModel,
+      TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
+
+    //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
+
+    //Build Predicate Expression
+    ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+    Domain domain = null;
+
+    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+      // Build ColumnExpresstion for Expresstion(Carbondata)
+      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+      Type type = cdch.getColumnType();
+
+      DataType coltype = Spi2CarbondataTypeMapper(type);
+      Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+
+      domain = originalConstraint.getDomains().get().get(c);
+      checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+      if (domain.getValues().isNone()) {
+        //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+        //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+        //new Expression()
+      }
+
+      if (domain.getValues().isAll()) {
+        //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+        //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+      }
+
+      List<Object> singleValues = new ArrayList<>();
+      List<Expression> rangeFilter = new ArrayList<>();
+      for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+        checkState(!range.isAll()); // Already checked
+        if (range.isSingleValue()) {
+          singleValues.add(range.getLow().getValue());
+        } else {
+          List<String> rangeConjuncts = new ArrayList<>();
+          if (!range.getLow().isLowerUnbounded()) {
+            Object value = ConvertDataByType(range.getLow().getValue(), type);
+            switch (range.getLow().getBound()) {
+              case ABOVE:
+                if (type == TimestampType.TIMESTAMP) {
+                  //todo not now
+                } else {
+                  GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+                      new LiteralExpression(value, coltype));
+                  //greater.setRangeExpression(true);
+                  rangeFilter.add(greater);
                 }
+                break;
+              case EXACTLY:
+                GreaterThanEqualToExpression greater =
+                    new GreaterThanEqualToExpression(colExpression,
+                        new LiteralExpression(value, coltype));
+                //greater.setRangeExpression(true);
+                rangeFilter.add(greater);
+                break;
+              case BELOW:
+                throw new IllegalArgumentException("Low marker should never use BELOW bound");
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
             }
-
-            if (singleValues.size() == 1) {
-                Expression ex = null;
-                if (coltype.equals(DataType.STRING)) {
-                    ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
-                } else
-                    ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
-                filters.add(ex);
-            }
-            else if(singleValues.size() > 1) {
-                ListExpression candidates = null;
-                List<Expression> exs = singleValues.stream().map((a) ->
-                {
-                    return new LiteralExpression(ConvertDataByType(a, type), coltype);
-                }).collect(Collectors.toList());
-                candidates = new ListExpression(exs);
-
-                if(candidates != null)
-                    filters.add(new InExpression(colExpression, candidates));
-            }
-            else if(rangeFilter.size() > 0){
-                if(rangeFilter.size() > 1) {
-                    Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
-                    if(rangeFilter.size() > 2)
-                    {
-                        for(int i = 2; i< rangeFilter.size(); i++)
-                        {
-                            filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
-                        }
-                    }
-                }
-                else if(rangeFilter.size() == 1)
-                    filters.add(rangeFilter.get(0));
+          }
+          if (!range.getHigh().isUpperUnbounded()) {
+            Object value = ConvertDataByType(range.getHigh().getValue(), type);
+            switch (range.getHigh().getBound()) {
+              case ABOVE:
+                throw new IllegalArgumentException("High marker should never use ABOVE bound");
+              case EXACTLY:
+                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+                    new LiteralExpression(value, coltype));
+                //less.setRangeExpression(true);
+                rangeFilter.add(less);
+                break;
+              case BELOW:
+                LessThanExpression less2 =
+                    new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+                //less2.setRangeExpression(true);
+                rangeFilter.add(less2);
+                break;
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
             }
+          }
         }
-
-        Expression finalFilters;
-        List<Expression> tmp = filters.build();
-        if(tmp.size() > 1) {
-            finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
-            if(tmp.size() > 2)
-            {
-                for(int i = 2; i< tmp.size(); i++)
-                {
-                    finalFilters = new AndExpression(finalFilters, tmp.get(i));
-                }
+      }
+
+      if (singleValues.size() == 1) {
+        Expression ex = null;
+        if (coltype.equals(DataType.STRING)) {
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else ex = new EqualToExpression(colExpression,
+            new LiteralExpression(singleValues.get(0), coltype));
+        filters.add(ex);
+      } else if (singleValues.size() > 1) {
+        ListExpression candidates = null;
+        List<Expression> exs = singleValues.stream().map((a) -> {
+          return new LiteralExpression(ConvertDataByType(a, type), coltype);
+        }).collect(Collectors.toList());
+        candidates = new ListExpression(exs);
+
+        if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+      } else if (rangeFilter.size() > 0) {
+        if (rangeFilter.size() > 1) {
+          Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+          if (rangeFilter.size() > 2) {
+            for (int i = 2; i < rangeFilter.size(); i++) {
+              filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
             }
-        }
-        else if(tmp.size() == 1)
-            finalFilters = tmp.get(0);
-        else
-            return;
-
-        // todo set into QueryModel
-        CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
-        queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
-    }
-
-    public static DataType Spi2CarbondataTypeMapper(Type colType)
-    {
-        if(colType == BooleanType.BOOLEAN)
-            return DataType.BOOLEAN;
-        else if(colType == SmallintType.SMALLINT)
-            return DataType.SHORT;
-        else if(colType == IntegerType.INTEGER)
-            return DataType.INT;
-        else if(colType == BigintType.BIGINT)
-            return DataType.LONG;
-        else if(colType == DoubleType.DOUBLE)
-            return DataType.DOUBLE;
-        else if(colType == DecimalType.createDecimalType())
-            return DataType.DECIMAL;
-        else if(colType == VarcharType.VARCHAR)
-            return DataType.STRING;
-        else if(colType == DateType.DATE)
-            return DataType.DATE;
-        else if(colType == TimestampType.TIMESTAMP)
-            return DataType.TIMESTAMP;
-        else
-            return DataType.STRING;
+          }
+        } else if (rangeFilter.size() == 1) filters.add(rangeFilter.get(0));
+      }
     }
 
-
-    public Object ConvertDataByType(Object rawdata, Type type)
-    {
-        if(type.equals(IntegerType.INTEGER))
-            return new Integer((rawdata.toString()));
-        else if(type.equals(BigintType.BIGINT))
-            return (Long)rawdata;
-        else if(type.equals(VarcharType.VARCHAR))
-            return ((Slice)rawdata).toStringUtf8();
-        else if(type.equals(BooleanType.BOOLEAN))
-            return (Boolean)(rawdata);
-
-        return rawdata;
-    }
+    Expression finalFilters;
+    List<Expression> tmp = filters.build();
+    if (tmp.size() > 1) {
+      finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+      if (tmp.size() > 2) {
+        for (int i = 2; i < tmp.size(); i++) {
+          finalFilters = new AndExpression(finalFilters, tmp.get(i));
+        }
+      }
+    } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+    else return;
+
+    // todo set into QueryModel
+    CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
+    queryModel.setFilterExpressionResolverTree(
+        CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
+  }
+
+  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+    else if (colType == IntegerType.INTEGER) return DataType.INT;
+    else if (colType == BigintType.BIGINT) return DataType.LONG;
+    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+    else if (colType == DateType.DATE) return DataType.DATE;
+    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else return DataType.STRING;
+  }
+
+  public Object ConvertDataByType(Object rawdata, Type type) {
+    if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+    else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+    else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+    else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+    return rawdata;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
index 741dfcc..ecc41ef 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
@@ -33,64 +33,56 @@ import static java.util.Objects.requireNonNull;
 
 public class CarbondataSplit implements ConnectorSplit {
 
-    private final String connectorId;
-    private final SchemaTableName schemaTableName;
-    private final TupleDomain<ColumnHandle> constraints;
-    private final CarbonLocalInputSplit localInputSplit;
-    private final List<CarbondataColumnConstraint> rebuildConstraints;
-    private final ImmutableList<HostAddress> addresses;
-
-    @JsonCreator
-    public CarbondataSplit( @JsonProperty("connectorId") String connectorId,
-                            @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
-                            @JsonProperty("constraints") TupleDomain<ColumnHandle> constraints,
-                            @JsonProperty("localInputSplit") CarbonLocalInputSplit localInputSplit,
-                            @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints) {
-        this.connectorId = requireNonNull(connectorId, "connectorId is null");
-        this.schemaTableName = requireNonNull(schemaTableName, "schemaTable is null");
-        this.constraints = requireNonNull(constraints, "constraints is null");
-        this.localInputSplit = requireNonNull(localInputSplit, "localInputSplit is null");
-        this.rebuildConstraints = requireNonNull(rebuildConstraints, "rebuildConstraints is null");
-        this.addresses = ImmutableList.of();
-    }
-
-
-    @JsonProperty
-    public String getConnectorId() {
-        return connectorId;
-    }
-
-    @JsonProperty
-    public SchemaTableName getSchemaTableName(){
-        return  schemaTableName;
-    }
-
-    @JsonProperty
-    public TupleDomain<ColumnHandle> getConstraints() {
-        return constraints;
-    }
-
-    @JsonProperty
-    public CarbonLocalInputSplit getLocalInputSplit(){return localInputSplit;}
-
-    @JsonProperty
-    public List<CarbondataColumnConstraint> getRebuildConstraints() {
-        return rebuildConstraints;
-    }
-
-    @Override
-    public boolean isRemotelyAccessible() {
-        return true;
-    }
-
-    @Override
-    public List<HostAddress> getAddresses() {
-        return addresses;
-    }
-
-    @Override
-    public Object getInfo() {
-        return this;
-    }
+  private final String connectorId;
+  private final SchemaTableName schemaTableName;
+  private final TupleDomain<ColumnHandle> constraints;
+  private final CarbonLocalInputSplit localInputSplit;
+  private final List<CarbondataColumnConstraint> rebuildConstraints;
+  private final ImmutableList<HostAddress> addresses;
+
+  @JsonCreator public CarbondataSplit(@JsonProperty("connectorId") String connectorId,
+      @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
+      @JsonProperty("constraints") TupleDomain<ColumnHandle> constraints,
+      @JsonProperty("localInputSplit") CarbonLocalInputSplit localInputSplit,
+      @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null");
+    this.schemaTableName = requireNonNull(schemaTableName, "schemaTable is null");
+    this.constraints = requireNonNull(constraints, "constraints is null");
+    this.localInputSplit = requireNonNull(localInputSplit, "localInputSplit is null");
+    this.rebuildConstraints = requireNonNull(rebuildConstraints, "rebuildConstraints is null");
+    this.addresses = ImmutableList.of();
+  }
+
+  @JsonProperty public String getConnectorId() {
+    return connectorId;
+  }
+
+  @JsonProperty public SchemaTableName getSchemaTableName() {
+    return schemaTableName;
+  }
+
+  @JsonProperty public TupleDomain<ColumnHandle> getConstraints() {
+    return constraints;
+  }
+
+  @JsonProperty public CarbonLocalInputSplit getLocalInputSplit() {
+    return localInputSplit;
+  }
+
+  @JsonProperty public List<CarbondataColumnConstraint> getRebuildConstraints() {
+    return rebuildConstraints;
+  }
+
+  @Override public boolean isRemotelyAccessible() {
+    return true;
+  }
+
+  @Override public List<HostAddress> getAddresses() {
+    return addresses;
+  }
+
+  @Override public Object getInfo() {
+    return this;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index 86390e3..a8902eb 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -50,255 +50,224 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
-public class CarbondataSplitManager
-        implements ConnectorSplitManager
-{
-
-    private final String connectorId;
-    private final CarbonTableReader carbonTableReader;
-
-    @Inject
-    public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader)
-    {
-        this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
-        this.carbonTableReader = requireNonNull(reader, "client is null");
-    }
-
-    public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
-    {
-        CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle)layout;
-        CarbondataTableHandle tableHandle = layoutHandle.getTable();
-        SchemaTableName key = tableHandle.getSchemaTableName();
-
-        //get all filter domain
-        List<CarbondataColumnConstraint> rebuildConstraints = getColumnConstraints(layoutHandle.getConstraint());
-
-        CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
-        Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
-
-        if(cache != null) {
-            try {
-                List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
-
-                ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
-                for (CarbonLocalInputSplit split : splits) {
-                    cSplits.add(new CarbondataSplit(
-                            connectorId,
-                            tableHandle.getSchemaTableName(),
-                            layoutHandle.getConstraint(),
-                            split,
-                            rebuildConstraints
-                    ));
-                }
-                return new FixedSplitSource(cSplits.build());
-            } catch (Exception ex) {
-                System.out.println(ex.toString());
-            }
+public class CarbondataSplitManager implements ConnectorSplitManager {
+
+  private final String connectorId;
+  private final CarbonTableReader carbonTableReader;
+
+  @Inject
+  public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = requireNonNull(reader, "client is null");
+  }
+
+  public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorTableLayoutHandle layout) {
+    CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle) layout;
+    CarbondataTableHandle tableHandle = layoutHandle.getTable();
+    SchemaTableName key = tableHandle.getSchemaTableName();
+
+    //get all filter domain
+    List<CarbondataColumnConstraint> rebuildConstraints =
+        getColumnConstraints(layoutHandle.getConstraint());
+
+    CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
+    Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
+
+    if (cache != null) {
+      try {
+        List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
+
+        ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
+        for (CarbonLocalInputSplit split : splits) {
+          cSplits.add(new CarbondataSplit(connectorId, tableHandle.getSchemaTableName(),
+              layoutHandle.getConstraint(), split, rebuildConstraints));
         }
-        return null;
+        return new FixedSplitSource(cSplits.build());
+      } catch (Exception ex) {
+        System.out.println(ex.toString());
+      }
     }
-
-
-    public List<CarbondataColumnConstraint> getColumnConstraints(TupleDomain<ColumnHandle> constraint)
-    {
-        ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
-        for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains().get()) {
-            CarbondataColumnHandle columnHandle = checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
-
-            constraintBuilder.add(new CarbondataColumnConstraint(
-                    columnHandle.getColumnName(),
-                    Optional.of(columnDomain.getDomain()),
-                    columnHandle.isInvertedIndex()));
-        }
-
-        return constraintBuilder.build();
+    return null;
+  }
+
+  public List<CarbondataColumnConstraint> getColumnConstraints(
+      TupleDomain<ColumnHandle> constraint) {
+    ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
+    for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains()
+        .get()) {
+      CarbondataColumnHandle columnHandle =
+          checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
+
+      constraintBuilder.add(new CarbondataColumnConstraint(columnHandle.getColumnName(),
+          Optional.of(columnDomain.getDomain()), columnHandle.isInvertedIndex()));
     }
 
-
-    public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable)
-    {
-        ImmutableList.Builder<Expression> filters = ImmutableList.builder();
-
-        Domain domain = null;
-
-        for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
-
-            CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
-            Type type = cdch.getColumnType();
-
-            List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
-            Optional<CarbonColumn>  target = ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
-
-            if(target.get() == null)
-                return null;
-
-            DataType coltype = target.get().getDataType();
-            ColumnExpression colExpression = new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
-            //colExpression.setColIndex(cs.getSchemaOrdinal());
-            colExpression.setDimension(target.get().isDimesion());
-            colExpression.setDimension(carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
-            colExpression.setCarbonColumn(target.get());
-
-            domain = originalConstraint.getDomains().get().get(c);
-            checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
-
-            if (domain.getValues().isNone()) {
-                //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
-                //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
-                //new Expression()
-            }
-
-            if (domain.getValues().isAll()) {
-                //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
-                //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
-            }
-
-            List<Object> singleValues = new ArrayList<>();
-            List<Expression> rangeFilter = new ArrayList<>();
-            for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
-                checkState(!range.isAll()); // Already checked
-                if (range.isSingleValue()) {
-                    singleValues.add(range.getLow().getValue());
-                }
-                else
-                {
-                    List<String> rangeConjuncts = new ArrayList<>();
-                    if (!range.getLow().isLowerUnbounded()) {
-                        Object value = ConvertDataByType(range.getLow().getValue(), type);
-                        switch (range.getLow().getBound()) {
-                            case ABOVE:
-                                if (type == TimestampType.TIMESTAMP) {
-                                    //todo not now
-                                } else {
-                                    GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
-                                    //greater.setRangeExpression(true);
-                                    rangeFilter.add(greater);
-                                }
-                                break;
-                            case EXACTLY:
-                                GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
-                                //greater.setRangeExpression(true);
-                                rangeFilter.add(greater);
-                                break;
-                            case BELOW:
-                                throw new IllegalArgumentException("Low marker should never use BELOW bound");
-                            default:
-                                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
-                        }
-                    }
-                    if (!range.getHigh().isUpperUnbounded()) {
-                        Object value = ConvertDataByType(range.getHigh().getValue(), type);
-                        switch (range.getHigh().getBound()) {
-                            case ABOVE:
-                                throw new IllegalArgumentException("High marker should never use ABOVE bound");
-                            case EXACTLY:
-                                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
-                                //less.setRangeExpression(true);
-                                rangeFilter.add(less);
-                                break;
-                            case BELOW:
-                                LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
-                                //less2.setRangeExpression(true);
-                                rangeFilter.add(less2);
-                                break;
-                            default:
-                                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
-                        }
-                    }
+    return constraintBuilder.build();
+  }
+
+  public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint,
+      CarbonTable carbonTable) {
+    ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+    Domain domain = null;
+
+    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+      Type type = cdch.getColumnType();
+
+      List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
+      Optional<CarbonColumn> target =
+          ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
+
+      if (target.get() == null) return null;
+
+      DataType coltype = target.get().getDataType();
+      ColumnExpression colExpression =
+          new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
+      //colExpression.setColIndex(cs.getSchemaOrdinal());
+      colExpression.setDimension(target.get().isDimesion());
+      colExpression.setDimension(
+          carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
+      colExpression.setCarbonColumn(target.get());
+
+      domain = originalConstraint.getDomains().get().get(c);
+      checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+      if (domain.getValues().isNone()) {
+        //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+        //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+        //new Expression()
+      }
+
+      if (domain.getValues().isAll()) {
+        //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+        //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+      }
+
+      List<Object> singleValues = new ArrayList<>();
+      List<Expression> rangeFilter = new ArrayList<>();
+      for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+        checkState(!range.isAll()); // Already checked
+        if (range.isSingleValue()) {
+          singleValues.add(range.getLow().getValue());
+        } else {
+          List<String> rangeConjuncts = new ArrayList<>();
+          if (!range.getLow().isLowerUnbounded()) {
+            Object value = ConvertDataByType(range.getLow().getValue(), type);
+            switch (range.getLow().getBound()) {
+              case ABOVE:
+                if (type == TimestampType.TIMESTAMP) {
+                  //todo not now
+                } else {
+                  GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+                      new LiteralExpression(value, coltype));
+                  //greater.setRangeExpression(true);
+                  rangeFilter.add(greater);
                 }
+                break;
+              case EXACTLY:
+                GreaterThanEqualToExpression greater =
+                    new GreaterThanEqualToExpression(colExpression,
+                        new LiteralExpression(value, coltype));
+                //greater.setRangeExpression(true);
+                rangeFilter.add(greater);
+                break;
+              case BELOW:
+                throw new IllegalArgumentException("Low marker should never use BELOW bound");
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
             }
-
-            if (singleValues.size() == 1) {
-                Expression ex = null;
-                if (coltype.equals(DataType.STRING)) {
-                    ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
-                } else
-                    ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
-                filters.add(ex);
-            }
-            else if(singleValues.size() > 1) {
-                ListExpression candidates = null;
-                List<Expression> exs = singleValues.stream().map((a) ->
-                {
-                    return new LiteralExpression(ConvertDataByType(a, type), coltype);
-                }).collect(Collectors.toList());
-                candidates = new ListExpression(exs);
-
-                if(candidates != null)
-                    filters.add(new InExpression(colExpression, candidates));
-            }
-            else if(rangeFilter.size() > 0){
-                if(rangeFilter.size() > 1) {
-                    Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
-                    if(rangeFilter.size() > 2)
-                    {
-                        for(int i = 2; i< rangeFilter.size(); i++)
-                        {
-                            filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
-                        }
-                    }
-                }
-                else if(rangeFilter.size() == 1)//only have one value
-                    filters.add(rangeFilter.get(0));
+          }
+          if (!range.getHigh().isUpperUnbounded()) {
+            Object value = ConvertDataByType(range.getHigh().getValue(), type);
+            switch (range.getHigh().getBound()) {
+              case ABOVE:
+                throw new IllegalArgumentException("High marker should never use ABOVE bound");
+              case EXACTLY:
+                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+                    new LiteralExpression(value, coltype));
+                //less.setRangeExpression(true);
+                rangeFilter.add(less);
+                break;
+              case BELOW:
+                LessThanExpression less2 =
+                    new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+                //less2.setRangeExpression(true);
+                rangeFilter.add(less2);
+                break;
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
             }
+          }
         }
-
-        Expression finalFilters;
-        List<Expression> tmp = filters.build();
-        if(tmp.size() > 1) {
-            finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
-            if(tmp.size() > 2)
-            {
-                for(int i = 2; i< tmp.size(); i++)
-                {
-                    finalFilters = new AndExpression(finalFilters, tmp.get(i));
-                }
+      }
+
+      if (singleValues.size() == 1) {
+        Expression ex = null;
+        if (coltype.equals(DataType.STRING)) {
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else ex = new EqualToExpression(colExpression,
+            new LiteralExpression(singleValues.get(0), coltype));
+        filters.add(ex);
+      } else if (singleValues.size() > 1) {
+        ListExpression candidates = null;
+        List<Expression> exs = singleValues.stream().map((a) -> {
+          return new LiteralExpression(ConvertDataByType(a, type), coltype);
+        }).collect(Collectors.toList());
+        candidates = new ListExpression(exs);
+
+        if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+      } else if (rangeFilter.size() > 0) {
+        if (rangeFilter.size() > 1) {
+          Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+          if (rangeFilter.size() > 2) {
+            for (int i = 2; i < rangeFilter.size(); i++) {
+              filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
             }
-        }
-        else if(tmp.size() == 1)
-            finalFilters = tmp.get(0);
-        else//no filter
-            return null;
-
-        return finalFilters;
-    }
-
-    public static DataType Spi2CarbondataTypeMapper(Type colType)
-    {
-        if(colType == BooleanType.BOOLEAN)
-            return DataType.BOOLEAN;
-        else if(colType == SmallintType.SMALLINT)
-            return DataType.SHORT;
-        else if(colType == IntegerType.INTEGER)
-            return DataType.INT;
-        else if(colType == BigintType.BIGINT)
-            return DataType.LONG;
-        else if(colType == DoubleType.DOUBLE)
-            return DataType.DOUBLE;
-        else if(colType == DecimalType.createDecimalType())
-            return DataType.DECIMAL;
-        else if(colType == VarcharType.VARCHAR)
-            return DataType.STRING;
-        else if(colType == DateType.DATE)
-            return DataType.DATE;
-        else if(colType == TimestampType.TIMESTAMP)
-            return DataType.TIMESTAMP;
-        else
-            return DataType.STRING;
+          }
+        } else if (rangeFilter.size() == 1)//only have one value
+          filters.add(rangeFilter.get(0));
+      }
     }
 
-
-    public Object ConvertDataByType(Object rawdata, Type type)
-    {
-        if(type.equals(IntegerType.INTEGER))
-            return new Integer((rawdata.toString()));
-        else if(type.equals(BigintType.BIGINT))
-            return (Long)rawdata;
-        else if(type.equals(VarcharType.VARCHAR))
-            return ((Slice)rawdata).toStringUtf8();
-        else if(type.equals(BooleanType.BOOLEAN))
-            return (Boolean)(rawdata);
-
-        return rawdata;
-    }
+    Expression finalFilters;
+    List<Expression> tmp = filters.build();
+    if (tmp.size() > 1) {
+      finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+      if (tmp.size() > 2) {
+        for (int i = 2; i < tmp.size(); i++) {
+          finalFilters = new AndExpression(finalFilters, tmp.get(i));
+        }
+      }
+    } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+    else//no filter
+      return null;
+
+    return finalFilters;
+  }
+
+  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+    else if (colType == IntegerType.INTEGER) return DataType.INT;
+    else if (colType == BigintType.BIGINT) return DataType.LONG;
+    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+    else if (colType == DateType.DATE) return DataType.DATE;
+    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else return DataType.STRING;
+  }
+
+  public Object ConvertDataByType(Object rawdata, Type type) {
+    if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+    else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+    else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+    else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+    return rawdata;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
index b0caf52..0a3c820 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
@@ -28,57 +28,44 @@ import java.util.Objects;
 import static java.util.Locale.ENGLISH;
 import static java.util.Objects.requireNonNull;
 
-public class CarbondataTableHandle
-        implements ConnectorTableHandle {
+public class CarbondataTableHandle implements ConnectorTableHandle {
 
-    private final String connectorId;
-    private final SchemaTableName schemaTableName;
+  private final String connectorId;
+  private final SchemaTableName schemaTableName;
 
-    @JsonCreator
-    public CarbondataTableHandle(
-            @JsonProperty("connectorId") String connectorId,
-            @JsonProperty("schemaTableName") SchemaTableName schemaTableName)
-    {
-        this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
-        this.schemaTableName = schemaTableName;
-    }
-
-    @JsonProperty
-    public String getConnectorId()
-    {
-        return connectorId;
-    }
+  @JsonCreator public CarbondataTableHandle(@JsonProperty("connectorId") String connectorId,
+      @JsonProperty("schemaTableName") SchemaTableName schemaTableName) {
+    this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
+    this.schemaTableName = schemaTableName;
+  }
 
-    @JsonProperty
-    public SchemaTableName getSchemaTableName()
-    {
-        return schemaTableName;
-    }
+  @JsonProperty public String getConnectorId() {
+    return connectorId;
+  }
 
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(connectorId, schemaTableName);
-    }
+  @JsonProperty public SchemaTableName getSchemaTableName() {
+    return schemaTableName;
+  }
 
-    @Override
-    public boolean equals(Object obj)
-    {
-        if (this == obj) {
-            return true;
-        }
-        if ((obj == null) || (getClass() != obj.getClass())) {
-            return false;
-        }
+  @Override public int hashCode() {
+    return Objects.hash(connectorId, schemaTableName);
+  }
 
-        CarbondataTableHandle other = (CarbondataTableHandle) obj;
-        return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName.equals(other.getSchemaTableName());
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
     }
-
-    @Override
-    public String toString()
-    {
-        return Joiner.on(":").join(connectorId, schemaTableName.toString());
+    if ((obj == null) || (getClass() != obj.getClass())) {
+      return false;
     }
 
+    CarbondataTableHandle other = (CarbondataTableHandle) obj;
+    return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName
+        .equals(other.getSchemaTableName());
+  }
+
+  @Override public String toString() {
+    return Joiner.on(":").join(connectorId, schemaTableName.toString());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
index dc0506f..bf6318f 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
@@ -29,60 +29,43 @@ import java.util.Objects;
 import static com.google.common.base.Objects.toStringHelper;
 import static java.util.Objects.requireNonNull;
 
-public class CarbondataTableLayoutHandle
-        implements ConnectorTableLayoutHandle
-{
-    private final CarbondataTableHandle table;
-    private final TupleDomain<ColumnHandle> constraint;
+public class CarbondataTableLayoutHandle implements ConnectorTableLayoutHandle {
+  private final CarbondataTableHandle table;
+  private final TupleDomain<ColumnHandle> constraint;
 
-    @JsonCreator
-    public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
-                                       @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
-    {
-        this.table = requireNonNull(table, "table is null");
-        this.constraint = requireNonNull(constraint, "constraint is null");
-    }
-
-    @JsonProperty
-    public CarbondataTableHandle getTable()
-    {
-        return table;
-    }
-
-    @JsonProperty
-    public TupleDomain<ColumnHandle> getConstraint()
-    {
-        return constraint;
-    }
+  @JsonCreator
+  public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
+      @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint) {
+    this.table = requireNonNull(table, "table is null");
+    this.constraint = requireNonNull(constraint, "constraint is null");
+  }
 
-    @Override
-    public boolean equals(Object obj)
-    {
-        if (this == obj) {
-            return true;
-        }
+  @JsonProperty public CarbondataTableHandle getTable() {
+    return table;
+  }
 
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
+  @JsonProperty public TupleDomain<ColumnHandle> getConstraint() {
+    return constraint;
+  }
 
-        CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
-        return Objects.equals(table, other.table)
-                && Objects.equals(constraint, other.constraint);
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
     }
 
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(table, constraint);
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
     }
 
-    @Override
-    public String toString()
-    {
-        return toStringHelper(this)
-                .add("table", table)
-                .add("constraint", constraint)
-                .toString();
-    }
+    CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
+    return Objects.equals(table, other.table) && Objects.equals(constraint, other.constraint);
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(table, constraint);
+  }
+
+  @Override public String toString() {
+    return toStringHelper(this).add("table", table).add("constraint", constraint).toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
index 06a84e2..e95c490 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.presto;
 
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 
-public enum CarbondataTransactionHandle
-        implements ConnectorTransactionHandle
-{
-    INSTANCE
+public enum CarbondataTransactionHandle implements ConnectorTransactionHandle {
+  INSTANCE
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java b/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java
index b7b0d90..cb30907 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/Types.java
@@ -23,16 +23,13 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
 public class Types {
-    private Types() {}
+  private Types() {
+  }
 
-    public static <A, B extends A> B checkType(A value, Class<B> target, String name)
-    {
-        requireNonNull(value, String.format(Locale.ENGLISH, "%s is null", name));
-        checkArgument(target.isInstance(value),
-                "%s must be of type %s, not %s",
-                name,
-                target.getName(),
-                value.getClass().getName());
-        return target.cast(value);
-    }
+  public static <A, B extends A> B checkType(A value, Class<B> target, String name) {
+    requireNonNull(value, String.format(Locale.ENGLISH, "%s is null", name));
+    checkArgument(target.isInstance(value), "%s must be of type %s, not %s", name, target.getName(),
+        value.getClass().getName());
+    return target.cast(value);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index 9cde7a6..9940061 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -24,70 +24,59 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class CarbonLocalInputSplit {
 
-    private static final long serialVersionUID = 3520344046772190207L;
-    private String segmentId;
-    private String path;
-    private long start;
-    private long length;
-    private List<String> locations;
-    private short version;
-    /**
-     * Number of BlockLets in a block
-     */
-    private int numberOfBlocklets = 0;
+  private static final long serialVersionUID = 3520344046772190207L;
+  private String segmentId;
+  private String path;
+  private long start;
+  private long length;
+  private List<String> locations;
+  private short version;
+  /**
+   * Number of BlockLets in a block
+   */
+  private int numberOfBlocklets = 0;
 
+  @JsonProperty public short getVersion() {
+    return version;
+  }
 
-    @JsonProperty
-    public short getVersion(){
-        return version;
-    }
+  @JsonProperty public List<String> getLocations() {
+    return locations;
+  }
 
-    @JsonProperty
-    public List<String> getLocations() {
-        return locations;
-    }
+  @JsonProperty public long getLength() {
+    return length;
+  }
 
-    @JsonProperty
-    public long getLength() {
-        return length;
-    }
+  @JsonProperty public long getStart() {
+    return start;
+  }
 
-    @JsonProperty
-    public long getStart() {
-        return start;
-    }
+  @JsonProperty public String getPath() {
+    return path;
+  }
 
-    @JsonProperty
-    public String getPath() {
-        return path;
-    }
+  @JsonProperty public String getSegmentId() {
+    return segmentId;
+  }
 
-    @JsonProperty
-    public String getSegmentId() {
-        return segmentId;
-    }
+  @JsonProperty public int getNumberOfBlocklets() {
+    return numberOfBlocklets;
+  }
 
-    @JsonProperty
-    public int getNumberOfBlocklets() {
-        return numberOfBlocklets;
-    }
-
-    @JsonCreator
-    public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
-                                 @JsonProperty("path") String path,
-                                 @JsonProperty("start") long start,
-                                 @JsonProperty("length") long length,
-                                 @JsonProperty("locations") List<String> locations,
-                                 @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
+  @JsonCreator public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
+      @JsonProperty("path") String path, @JsonProperty("start") long start,
+      @JsonProperty("length") long length, @JsonProperty("locations") List<String> locations,
+      @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
                                  @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
-                                 @JsonProperty("version") short version) {
-        this.path = path;
-        this.start = start;
-        this.length = length;
-        this.segmentId = segmentId;
-        this.locations = locations;
-        this.numberOfBlocklets = numberOfBlocklets;
-        //this.tableBlockInfo = tableBlockInfo;
-        this.version = version;
-    }
+      @JsonProperty("version") short version) {
+    this.path = path;
+    this.start = start;
+    this.length = length;
+    this.segmentId = segmentId;
+    this.locations = locations;
+    this.numberOfBlocklets = numberOfBlocklets;
+    //this.tableBlockInfo = tableBlockInfo;
+    this.version = version;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
index ee636b1..b138f18 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -24,21 +24,17 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public class CarbonTableCacheModel {
 
-    public CarbonTableIdentifier carbonTableIdentifier;
-    public CarbonTablePath carbonTablePath;
+  public CarbonTableIdentifier carbonTableIdentifier;
+  public CarbonTablePath carbonTablePath;
 
-    public TableInfo tableInfo;
-    public CarbonTable carbonTable;
-    public String[] segments;
+  public TableInfo tableInfo;
+  public CarbonTable carbonTable;
+  public String[] segments;
 
-    public boolean isValid()
-    {
-        if(carbonTable != null
-                && carbonTablePath != null
-                && carbonTableIdentifier != null)
-            return true;
-        else
-            return false;
-    }
+  public boolean isValid() {
+    if (carbonTable != null && carbonTablePath != null && carbonTableIdentifier != null)
+      return true;
+    else return false;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
index a682b66..a2b0a8c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -22,41 +22,35 @@ import io.airlift.configuration.Config;
 import javax.validation.constraints.NotNull;
 
 public class CarbonTableConfig {
-    //read from config
-    private String dbPtah;
-    private String tablePath;
-    private String storePath;
-
-    @NotNull
-    public String getDbPtah() {
-        return dbPtah;
-    }
-
-    @Config("carbondata-store")
-    public CarbonTableConfig setDbPtah(String dbPtah) {
-        this.dbPtah = dbPtah;
-        return this;
-    }
-
-    @NotNull
-    public String getTablePath() {
-        return tablePath;
-    }
-
-    @Config("carbondata-store")
-    public CarbonTableConfig setTablePath(String tablePath) {
-        this.tablePath = tablePath;
-        return this;
-    }
-
-    @NotNull
-    public String getStorePath() {
-        return storePath;
-    }
-
-    @Config("carbondata-store")
-    public CarbonTableConfig setStorePath(String storePath) {
-        this.storePath = storePath;
-        return this;
-    }
+  //read from config
+  private String dbPtah;
+  private String tablePath;
+  private String storePath;
+
+  @NotNull public String getDbPtah() {
+    return dbPtah;
+  }
+
+  @Config("carbondata-store") public CarbonTableConfig setDbPtah(String dbPtah) {
+    this.dbPtah = dbPtah;
+    return this;
+  }
+
+  @NotNull public String getTablePath() {
+    return tablePath;
+  }
+
+  @Config("carbondata-store") public CarbonTableConfig setTablePath(String tablePath) {
+    this.tablePath = tablePath;
+    return this;
+  }
+
+  @NotNull public String getStorePath() {
+    return storePath;
+  }
+
+  @Config("carbondata-store") public CarbonTableConfig setStorePath(String storePath) {
+    this.storePath = storePath;
+    return this;
+  }
 }


[3/4] incubator-carbondata git commit: [CARBONDATA-826] Create carbondata-connector for query carbon data in presto

Posted by ch...@apache.org.
[CARBONDATA-826] Create carbondata-connector for query carbon data in presto

fix guava version in pom

add readme for presto integration

add readme for presto integration

for travis ci

for travis ci


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/27123300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/27123300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/27123300

Branch: refs/heads/master
Commit: 27123300794228e36e6f4f5887b9087b174c1bb3
Parents: 08a6e81
Author: chenliang613 <ch...@huawei.com>
Authored: Mon Mar 27 16:40:20 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Wed Mar 29 14:34:26 2017 +0530

----------------------------------------------------------------------
 integration/presto/README.md                    |   86 ++
 integration/presto/pom.xml                      |   22 +-
 .../presto/CarbondataColumnConstraint.java      |  101 +-
 .../presto/CarbondataColumnHandle.java          |  205 ++--
 .../carbondata/presto/CarbondataConnector.java  |   87 +-
 .../presto/CarbondataConnectorFactory.java      |  119 +-
 .../presto/CarbondataConnectorId.java           |   49 +-
 .../presto/CarbondataHandleResolver.java        |   36 +-
 .../carbondata/presto/CarbondataMetadata.java   |  427 ++++---
 .../carbondata/presto/CarbondataModule.java     |   63 +-
 .../carbondata/presto/CarbondataPlugin.java     |   14 +-
 .../presto/CarbondataRecordCursor.java          |  190 ++-
 .../carbondata/presto/CarbondataRecordSet.java  |  110 +-
 .../presto/CarbondataRecordSetProvider.java     |  431 ++++---
 .../carbondata/presto/CarbondataSplit.java      |  110 +-
 .../presto/CarbondataSplitManager.java          |  449 ++++----
 .../presto/CarbondataTableHandle.java           |   75 +-
 .../presto/CarbondataTableLayoutHandle.java     |   79 +-
 .../presto/CarbondataTransactionHandle.java     |    6 +-
 .../org/apache/carbondata/presto/Types.java     |   19 +-
 .../presto/impl/CarbonLocalInputSplit.java      |  103 +-
 .../presto/impl/CarbonTableCacheModel.java      |   24 +-
 .../presto/impl/CarbonTableConfig.java          |   68 +-
 .../presto/impl/CarbonTableReader.java          | 1088 +++++++++---------
 .../GrtLtFilterProcessorTestCase.scala          |   18 +-
 25 files changed, 1906 insertions(+), 2073 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/README.md
----------------------------------------------------------------------
diff --git a/integration/presto/README.md b/integration/presto/README.md
new file mode 100644
index 0000000..8a7cd13
--- /dev/null
+++ b/integration/presto/README.md
@@ -0,0 +1,86 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+Please follow the below steps to query carbondata in presto
+
+### Config presto server
+* Download presto server 0.166 : https://repo1.maven.org/maven2/com/facebook/presto/presto-server/
+* Finish configuration as per https://prestodb.io/docs/current/installation/deployment.html
+  for example:
+  ```
+  carbondata.properties:
+  connector.name=carbondata
+  carbondata-store=/Users/apple/DEMO/presto_test/data
+  
+  config.properties:
+  coordinator=true
+  node-scheduler.include-coordinator=true
+  http-server.http.port=8086
+  query.max-memory=5GB
+  query.max-memory-per-node=1GB
+  discovery-server.enabled=true
+  discovery.uri=http://localhost:8086
+  
+  jvm.config:
+  -server
+  -Xmx4G
+  -XX:+UseG1GC
+  -XX:G1HeapRegionSize=32M
+  -XX:+UseGCOverheadLimit
+  -XX:+ExplicitGCInvokesConcurrent
+  -XX:+HeapDumpOnOutOfMemoryError
+  -XX:OnOutOfMemoryError=kill -9 %p
+  -XX:+TraceClassLoading
+  
+  log.properties:
+  com.facebook.presto=DEBUG
+  com.facebook.presto.server.PluginManager=DEBUG
+  
+  node.properties:
+  node.environment=carbondata
+  node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
+  node.data-dir=/Users/apple/DEMO/presto_test/data
+  ```
+* config carbondata-connector for presto
+  
+  First:compile carbondata-presto integration module
+  ```
+  $ git clone https://github.com/apache/incubator-carbondata
+  $ cd incubator-carbondata/integration/presto
+  $ mvn clean package
+  ```
+  Second:create one folder "carbondata" under ./presto-server-0.166/plugin
+  Third:copy all jar from ./incubator-carbondata/integration/presto/target/carbondata-presto-1.1.0-incubating-SNAPSHOT
+        to ./presto-server-0.166/plugin/carbondata
+  
+### Generate CarbonData file
+
+Please refer to quick start : https://github.com/apache/incubator-carbondata/blob/master/docs/quick-start-guide.md
+
+### Query carbondata in CLI of presto
+* Download presto-cli-0.166-executable.jar
+
+* Start CLI:
+  
+  ```
+  $ ./presto-cli-0.166-executable.jar --server localhost:8086 --catalog carbondata --schema default
+  ```
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 5493df1..15e31fa 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -28,6 +28,7 @@
 
   <artifactId>carbondata-presto</artifactId>
   <name>Apache CarbonData :: presto</name>
+  <packaging>presto-plugin</packaging>
 
   <properties>
     <dev.path>${basedir}/../../dev</dev.path>
@@ -68,7 +69,7 @@
       <groupId>io.airlift</groupId>
       <artifactId>bootstrap</artifactId>
       <version>0.144</version>
-      <scope>provided</scope>
+      <!--<scope>provided</scope>-->
       <exclusions>
         <exclusion>
           <groupId>org.slf4j</groupId>
@@ -81,14 +82,14 @@
       <groupId>io.airlift</groupId>
       <artifactId>json</artifactId>
       <version>0.144</version>
-      <scope>provided</scope>
+      <!--<scope>provided</scope>-->
     </dependency>
 
     <dependency>
       <groupId>io.airlift</groupId>
       <artifactId>log</artifactId>
       <version>0.144</version>
-      <scope>provided</scope>
+      <!--<scope>provided</scope>-->
     </dependency>
 
     <dependency>
@@ -115,7 +116,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>14.0.1</version>
+      <version>18.0</version>
     </dependency>
 
     <dependency>
@@ -129,7 +130,7 @@
       <groupId>com.facebook.presto</groupId>
       <artifactId>presto-spi</artifactId>
       <version>0.166</version>
-      <!--<scope>provided</scope>-->
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -168,7 +169,7 @@
             <failIfNoTests>false</failIfNoTests>
           </configuration>
         </plugin>
-        <!--
+
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-checkstyle-plugin</artifactId>
@@ -213,12 +214,19 @@
         </plugin>
 
         <plugin>
+          <groupId>io.takari.maven.plugins</groupId>
+          <artifactId>presto-maven-plugin</artifactId>
+          <version>0.1.12</version>
+          <extensions>true</extensions>
+        </plugin>
+
+        <plugin>
           <groupId>pl.project13.maven</groupId>
           <artifactId>git-commit-id-plugin</artifactId>
           <configuration>
             <skip>true</skip>
           </configuration>
-        </plugin>-->
+        </plugin>
       </plugins>
     </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
index aad378e..0665aed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
@@ -30,75 +30,54 @@ import static com.google.common.base.Objects.toStringHelper;
 import static java.util.Objects.requireNonNull;
 
 public class CarbondataColumnConstraint {
-    private final String name;
-    private final boolean invertedindexed;
-    private Optional<Domain> domain;
+  private final String name;
+  private final boolean invertedindexed;
+  private Optional<Domain> domain;
 
-    @JsonCreator
-    public CarbondataColumnConstraint(
-            @JsonProperty("name") String name,
-            @JsonProperty("domain") Optional<Domain> domain,
-            @JsonProperty("invertedindexed") boolean invertedindexed)
-    {
-        this.name = requireNonNull(name, "name is null");
-        this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
-        this.domain = requireNonNull(domain, "domain is null");
-    }
-
-    @JsonProperty
-    public boolean isInvertedindexed()
-    {
-        return invertedindexed;
-    }
-
-    @JsonProperty
-    public String getName()
-    {
-        return name;
-    }
+  @JsonCreator public CarbondataColumnConstraint(@JsonProperty("name") String name,
+      @JsonProperty("domain") Optional<Domain> domain,
+      @JsonProperty("invertedindexed") boolean invertedindexed) {
+    this.name = requireNonNull(name, "name is null");
+    this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
+    this.domain = requireNonNull(domain, "domain is null");
+  }
 
-    @JsonProperty
-    public Optional<Domain> getDomain()
-    {
-        return domain;
-    }
+  @JsonProperty public boolean isInvertedindexed() {
+    return invertedindexed;
+  }
 
-    @JsonSetter
-    public void setDomain(Optional<Domain> domain)
-    {
-        this.domain = domain;
-    }
+  @JsonProperty public String getName() {
+    return name;
+  }
 
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(name, domain, invertedindexed);
-    }
+  @JsonProperty public Optional<Domain> getDomain() {
+    return domain;
+  }
 
-    @Override
-    public boolean equals(Object obj)
-    {
-        if (this == obj) {
-            return true;
-        }
+  @JsonSetter public void setDomain(Optional<Domain> domain) {
+    this.domain = domain;
+  }
 
-        if ((obj == null) || (getClass() != obj.getClass())) {
-            return false;
-        }
+  @Override public int hashCode() {
+    return Objects.hash(name, domain, invertedindexed);
+  }
 
-        CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
-        return Objects.equals(this.name, other.name)
-                && Objects.equals(this.domain, other.domain)
-                && Objects.equals(this.invertedindexed, other.invertedindexed);
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
     }
 
-    @Override
-    public String toString()
-    {
-        return toStringHelper(this)
-                .add("name", this.name)
-                .add("invertedindexed", this.invertedindexed)
-                .add("domain", this.domain)
-                .toString();
+    if ((obj == null) || (getClass() != obj.getClass())) {
+      return false;
     }
+
+    CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
+    return Objects.equals(this.name, other.name) && Objects.equals(this.domain, other.domain)
+        && Objects.equals(this.invertedindexed, other.invertedindexed);
+  }
+
+  @Override public String toString() {
+    return toStringHelper(this).add("name", this.name).add("invertedindexed", this.invertedindexed)
+        .add("domain", this.domain).toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index 2e3aef5..972a59c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -29,136 +29,111 @@ import java.util.Objects;
 import static com.google.common.base.Objects.toStringHelper;
 import static java.util.Objects.requireNonNull;
 
-public class CarbondataColumnHandle
-    implements ColumnHandle
-{
-        private final String connectorId;
-        private final String columnName;
-
-    public boolean isInvertedIndex() {
-        return isInvertedIndex;
-    }
+public class CarbondataColumnHandle implements ColumnHandle {
+  private final String connectorId;
+  private final String columnName;
 
-    private final Type columnType;
-    private final int ordinalPosition;
-    private final int keyOrdinal;
-    private final int columnGroupOrdinal;
+  public boolean isInvertedIndex() {
+    return isInvertedIndex;
+  }
 
-    private final int columnGroupId;
-    private final String columnUniqueId;
-    private final boolean isInvertedIndex;
+  private final Type columnType;
+  private final int ordinalPosition;
+  private final int keyOrdinal;
+  private final int columnGroupOrdinal;
 
-    public boolean isMeasure() {
-        return isMeasure;
-    }
+  private final int columnGroupId;
+  private final String columnUniqueId;
+  private final boolean isInvertedIndex;
 
-    private final boolean isMeasure;
+  public boolean isMeasure() {
+    return isMeasure;
+  }
 
-    public int getKeyOrdinal() {
-        return keyOrdinal;
-    }
+  private final boolean isMeasure;
 
-    public int getColumnGroupOrdinal() {
-        return columnGroupOrdinal;
-    }
+  public int getKeyOrdinal() {
+    return keyOrdinal;
+  }
 
-    public int getColumnGroupId() {
-        return columnGroupId;
-    }
+  public int getColumnGroupOrdinal() {
+    return columnGroupOrdinal;
+  }
 
-    public String getColumnUniqueId() {
-        return columnUniqueId;
-    }
+  public int getColumnGroupId() {
+    return columnGroupId;
+  }
+
+  public String getColumnUniqueId() {
+    return columnUniqueId;
+  }
     /* ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
         IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1  from tablename)
         The columnhandle of clm3 : has ordinalposition = 3
      */
 
-    @JsonCreator
-    public CarbondataColumnHandle(
-            @JsonProperty("connectorId") String connectorId,
-            @JsonProperty("columnName") String columnName,
-            @JsonProperty("columnType") Type columnType,
-            @JsonProperty("ordinalPosition") int ordinalPosition,
-            @JsonProperty("keyOrdinal") int keyOrdinal,
-            @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
-            @JsonProperty("isMeasure") boolean isMeasure,
-            @JsonProperty("columnGroupId") int columnGroupId,
-            @JsonProperty("columnUniqueId") String columnUniqueId,
-            @JsonProperty("isInvertedIndex") boolean isInvertedIndex)
-    {
-        this.connectorId = requireNonNull(connectorId, "connectorId is null");
-        this.columnName = requireNonNull(columnName, "columnName is null");
-        this.columnType = requireNonNull(columnType, "columnType is null");
-
-        this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
-        this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
-        this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
-
-        this.isMeasure = isMeasure;
-        this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
-        this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
-        this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
-    }
-
-    @JsonProperty
-    public String getConnectorId()
-    {
-        return connectorId;
-    }
-
-    @JsonProperty
-    public String getColumnName()
-    {
-        return columnName;
-    }
-
-    @JsonProperty
-    public Type getColumnType()
-    {
-        return columnType;
-    }
-
-    @JsonProperty
-    public int getOrdinalPosition()
-    {
-        return ordinalPosition;
+  @JsonCreator public CarbondataColumnHandle(@JsonProperty("connectorId") String connectorId,
+      @JsonProperty("columnName") String columnName, @JsonProperty("columnType") Type columnType,
+      @JsonProperty("ordinalPosition") int ordinalPosition,
+      @JsonProperty("keyOrdinal") int keyOrdinal,
+      @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
+      @JsonProperty("isMeasure") boolean isMeasure,
+      @JsonProperty("columnGroupId") int columnGroupId,
+      @JsonProperty("columnUniqueId") String columnUniqueId,
+      @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null");
+    this.columnName = requireNonNull(columnName, "columnName is null");
+    this.columnType = requireNonNull(columnType, "columnType is null");
+
+    this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
+    this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
+    this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
+
+    this.isMeasure = isMeasure;
+    this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
+    this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
+    this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+  }
+
+  @JsonProperty public String getConnectorId() {
+    return connectorId;
+  }
+
+  @JsonProperty public String getColumnName() {
+    return columnName;
+  }
+
+  @JsonProperty public Type getColumnType() {
+    return columnType;
+  }
+
+  @JsonProperty public int getOrdinalPosition() {
+    return ordinalPosition;
+  }
+
+  public ColumnMetadata getColumnMetadata() {
+    return new ColumnMetadata(columnName, columnType, null, false);
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(connectorId, columnName);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
     }
-
-    public ColumnMetadata getColumnMetadata()
-    {
-        return new ColumnMetadata(columnName, columnType, null, false);
+    if ((obj == null) || (getClass() != obj.getClass())) {
+      return false;
     }
 
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(connectorId, columnName);
-    }
+    CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
+    return Objects.equals(this.connectorId, other.connectorId) && Objects
+        .equals(this.columnName, other.columnName);
+  }
 
-    @Override
-    public boolean equals(Object obj)
-    {
-        if (this == obj) {
-            return true;
-        }
-        if ((obj == null) || (getClass() != obj.getClass())) {
-            return false;
-        }
-
-        CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
-        return Objects.equals(this.connectorId, other.connectorId) &&
-                Objects.equals(this.columnName, other.columnName);
-    }
-
-    @Override
-    public String toString()
-    {
-        return toStringHelper(this)
-                .add("connectorId", connectorId)
-                .add("columnName", columnName)
-                .add("columnType", columnType)
-                .add("ordinalPosition", ordinalPosition)
-                .toString();
-    }
+  @Override public String toString() {
+    return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
+        .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
index 90b4944..0f1dbda 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -26,63 +26,50 @@ import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
 import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
 import static java.util.Objects.requireNonNull;
 
-public class CarbondataConnector
-        implements Connector
-{
+public class CarbondataConnector implements Connector {
 
-    private static final Logger log = Logger.get(CarbondataConnector.class);
+  private static final Logger log = Logger.get(CarbondataConnector.class);
 
-    private final LifeCycleManager lifeCycleManager;
-    private final CarbondataMetadata metadata;
-    private final ConnectorSplitManager splitManager;
-    private final ConnectorRecordSetProvider recordSetProvider;
-    private final ClassLoader classLoader;
+  private final LifeCycleManager lifeCycleManager;
+  private final CarbondataMetadata metadata;
+  private final ConnectorSplitManager splitManager;
+  private final ConnectorRecordSetProvider recordSetProvider;
+  private final ClassLoader classLoader;
 
+  public CarbondataConnector(LifeCycleManager lifeCycleManager, CarbondataMetadata metadata,
+      ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider,
+      ClassLoader classLoader) {
+    this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.splitManager = requireNonNull(splitManager, "splitManager is null");
+    this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+    this.classLoader = requireNonNull(classLoader, "classLoader is null");
+  }
 
-    public CarbondataConnector(LifeCycleManager lifeCycleManager,
-                               CarbondataMetadata metadata,
-                               ConnectorSplitManager splitManager,
-                               ConnectorRecordSetProvider recordSetProvider,
-                               ClassLoader classLoader)
-    {
-        this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
-        this.metadata = requireNonNull(metadata, "metadata is null");
-        this.splitManager = requireNonNull(splitManager, "splitManager is null");
-        this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
-        this.classLoader = requireNonNull(classLoader, "classLoader is null");
-    }
-
-    @Override
-    public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) {
-        checkConnectorSupports(READ_COMMITTED, isolationLevel);
-        return CarbondataTransactionHandle.INSTANCE;
-    }
+  @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel,
+      boolean readOnly) {
+    checkConnectorSupports(READ_COMMITTED, isolationLevel);
+    return CarbondataTransactionHandle.INSTANCE;
+  }
 
-    @Override
-    public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
-        metadata.putClassLoader(classLoader);
-        return metadata;
-    }
+  @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+    metadata.putClassLoader(classLoader);
+    return metadata;
+  }
 
-    @Override
-    public ConnectorSplitManager getSplitManager() {
-        return splitManager;
-    }
+  @Override public ConnectorSplitManager getSplitManager() {
+    return splitManager;
+  }
 
-    @Override
-    public ConnectorRecordSetProvider getRecordSetProvider()
-    {
-        return recordSetProvider;
-    }
+  @Override public ConnectorRecordSetProvider getRecordSetProvider() {
+    return recordSetProvider;
+  }
 
-    @Override
-    public final void shutdown()
-    {
-        try {
-            lifeCycleManager.stop();
-        }
-        catch (Exception e) {
-            log.error(e, "Error shutting down connector");
-        }
+  @Override public final void shutdown() {
+    try {
+      lifeCycleManager.stop();
+    } catch (Exception e) {
+      log.error(e, "Error shutting down connector");
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 324699c..b146a2e 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -31,69 +31,60 @@ import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
-public class CarbondataConnectorFactory
-        implements ConnectorFactory {
-
-    private final String name;
-    private final ClassLoader classLoader;
-
-    public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader){
-        this.name = connectorName;
-        this.classLoader = requireNonNull(classLoader, "classLoader is null");
-    }
-
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public ConnectorHandleResolver getHandleResolver() {
-        return new CarbondataHandleResolver();
-    }
-
-    @Override
-    public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) {
-        requireNonNull(config, "config is null");
-
-        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
-            Bootstrap app = new Bootstrap(new JsonModule(), new CarbondataModule(connectorId, context.getTypeManager()));
-
-            Injector injector = app
-                    .strictConfig()
-                    .doNotInitializeLogging()
-                    .setRequiredConfigurationProperties(config)
-                    .initialize();
-
-            LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
-            CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
-            //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
-            ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
-            ConnectorRecordSetProvider connectorRecordSet = injector.getInstance(ConnectorRecordSetProvider.class);
-            //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
-
-
-            //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
-            //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
-            //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
-            //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
-            //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
-
-
-            return new CarbondataConnector(
-                    lifeCycleManager,
-                    metadata,
-                    new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
-                    connectorRecordSet,//new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
-                    classLoader
-                    //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
-                    //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
-                    //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
-            );
-        }
-        catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
+public class CarbondataConnectorFactory implements ConnectorFactory {
+
+  private final String name;
+  private final ClassLoader classLoader;
+
+  public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader) {
+    this.name = connectorName;
+    this.classLoader = requireNonNull(classLoader, "classLoader is null");
+  }
+
+  @Override public String getName() {
+    return name;
+  }
+
+  @Override public ConnectorHandleResolver getHandleResolver() {
+    return new CarbondataHandleResolver();
+  }
+
+  @Override public Connector create(String connectorId, Map<String, String> config,
+      ConnectorContext context) {
+    requireNonNull(config, "config is null");
+
+    try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+      Bootstrap app = new Bootstrap(new JsonModule(),
+          new CarbondataModule(connectorId, context.getTypeManager()));
+
+      Injector injector =
+          app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
+              .initialize();
+
+      LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+      CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+      //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
+      ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+      ConnectorRecordSetProvider connectorRecordSet =
+          injector.getInstance(ConnectorRecordSetProvider.class);
+      //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
+
+      //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+      //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
+      //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+      //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
+      //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+
+      return new CarbondataConnector(lifeCycleManager, metadata,
+          new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
+          //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
+          classLoader
+          //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+          //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+          //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+      );
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
     }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
index 5aa72f1..b4ba1dd 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
@@ -23,39 +23,30 @@ import java.util.Objects;
 
 import static java.util.Objects.requireNonNull;
 
-public class CarbondataConnectorId
-{
-    private final String id;
-
-    @Inject
-    public CarbondataConnectorId(String id)
-    {
-        this.id = requireNonNull(id, "id is null");
-    }
+public class CarbondataConnectorId {
+  private final String id;
 
-    @Override
-    public String toString()
-    {
-        return id;
-    }
+  @Inject public CarbondataConnectorId(String id) {
+    this.id = requireNonNull(id, "id is null");
+  }
 
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(id);
-    }
+  @Override public String toString() {
+    return id;
+  }
 
-    @Override
-    public boolean equals(Object obj)
-    {
-        if (this == obj) {
-            return true;
-        }
+  @Override public int hashCode() {
+    return Objects.hash(id);
+  }
 
-        if ((obj == null) || (getClass() != obj.getClass())) {
-            return false;
-        }
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
 
-        return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
+    if ((obj == null) || (getClass() != obj.getClass())) {
+      return false;
     }
+
+    return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
index 5918b46..7c65bfd 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
@@ -21,29 +21,23 @@ import com.facebook.presto.spi.*;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 
 public class CarbondataHandleResolver implements ConnectorHandleResolver {
-    @Override
-    public Class<? extends ConnectorTableHandle> getTableHandleClass() {
-        return CarbondataTableHandle.class;
-    }
+  @Override public Class<? extends ConnectorTableHandle> getTableHandleClass() {
+    return CarbondataTableHandle.class;
+  }
 
-    @Override
-    public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
-        return CarbondataTableLayoutHandle.class;
-    }
+  @Override public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
+    return CarbondataTableLayoutHandle.class;
+  }
 
-    @Override
-    public Class<? extends ColumnHandle> getColumnHandleClass() {
-        return CarbondataColumnHandle.class;
-    }
+  @Override public Class<? extends ColumnHandle> getColumnHandleClass() {
+    return CarbondataColumnHandle.class;
+  }
 
-    @Override
-    public Class<? extends ConnectorSplit> getSplitClass() {
-        return CarbondataSplit.class;
-    }
+  @Override public Class<? extends ConnectorSplit> getSplitClass() {
+    return CarbondataSplit.class;
+  }
 
-    @Override
-    public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
-    {
-        return CarbondataTransactionHandle.class;
-    }
+  @Override public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass() {
+    return CarbondataTransactionHandle.class;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index 7fa96c3..524ce20 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -37,46 +37,36 @@ import static org.apache.carbondata.presto.Types.checkType;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
-public class CarbondataMetadata
-        implements ConnectorMetadata
-{
-    private final String connectorId;
-    private CarbonTableReader carbonTableReader;
-    private ClassLoader classLoader;
-
-    private Map<String, ColumnHandle> columnHandleMap;
-
-    @Inject
-    public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader)
-    {
-        this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
-        this.carbonTableReader = requireNonNull(reader, "client is null");
+public class CarbondataMetadata implements ConnectorMetadata {
+  private final String connectorId;
+  private CarbonTableReader carbonTableReader;
+  private ClassLoader classLoader;
+
+  private Map<String, ColumnHandle> columnHandleMap;
+
+  @Inject public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = requireNonNull(reader, "client is null");
+  }
+
+  public void putClassLoader(ClassLoader classLoader) {
+    this.classLoader = classLoader;
+  }
+
+  @Override public List<String> listSchemaNames(ConnectorSession session) {
+    return listSchemaNamesInternal();
+  }
+
+  public List<String> listSchemaNamesInternal() {
+    List<String> ret;
+    try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+      ret = carbonTableReader.getSchemaNames();
     }
+    return ret;
+  }
 
-
-    public void putClassLoader(ClassLoader classLoader)
-    {
-        this.classLoader = classLoader;
-    }
-
-
-    @Override
-    public List<String> listSchemaNames(ConnectorSession session) {
-        return listSchemaNamesInternal();
-    }
-
-
-    public List<String> listSchemaNamesInternal()
-    {
-        List<String> ret;
-        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
-            ret = carbonTableReader.getSchemaNames();
-        }
-        return ret;
-    }
-
-    @Override
-    public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
+  @Override
+  public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
 
         /*List<SchemaTableName> all = carbonTableReader.getTableList();
         if(schemaNameOrNull != null)
@@ -85,225 +75,206 @@ public class CarbondataMetadata
         }
         return all;*/
 
-        List<String> schemaNames;
-        if (schemaNameOrNull != null) {
-            schemaNames = ImmutableList.of(schemaNameOrNull);
-        }
-        else {
-            schemaNames = carbonTableReader.getSchemaNames();
-        }
-
-        ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
-        for (String schemaName : schemaNames) {
-            for (String tableName : carbonTableReader.getTableNames(schemaName)) {
-                builder.add(new SchemaTableName(schemaName, tableName));
-            }
-        }
-        return builder.build();
+    List<String> schemaNames;
+    if (schemaNameOrNull != null) {
+      schemaNames = ImmutableList.of(schemaNameOrNull);
+    } else {
+      schemaNames = carbonTableReader.getSchemaNames();
     }
 
-    @Override
-    public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) {
-        requireNonNull(prefix, "SchemaTablePrefix is null");
-
-        ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
-        for (SchemaTableName tableName : listTables(session, prefix)) {
-            ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
-            if (tableMetadata != null) {
-                columns.put(tableName, tableMetadata.getColumns());
-            }
-        }
-        return columns.build();
+    ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
+    for (String schemaName : schemaNames) {
+      for (String tableName : carbonTableReader.getTableNames(schemaName)) {
+        builder.add(new SchemaTableName(schemaName, tableName));
+      }
     }
-
-    //if prefix is null. return all tables
-    //if prefix is not null, just return this table
-    private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
-    {
-        if (prefix.getSchemaName() == null) {
-            return listTables(session, prefix.getSchemaName());
-        }
-        return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+    return builder.build();
+  }
+
+  @Override
+  public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session,
+      SchemaTablePrefix prefix) {
+    requireNonNull(prefix, "SchemaTablePrefix is null");
+
+    ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
+    for (SchemaTableName tableName : listTables(session, prefix)) {
+      ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
+      if (tableMetadata != null) {
+        columns.put(tableName, tableMetadata.getColumns());
+      }
     }
-
-    private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
-    {
-        if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
-            return null;
-        }
-
-        CarbonTable cb = carbonTableReader.getTable(tableName);
-        if (cb == null) {
-            return null;
-        }
-
-        List<ColumnMetadata> spiCols = new LinkedList<>();
-        List<CarbonDimension> cols = cb.getDimensionByTableName(tableName.getTableName());
-        for(CarbonDimension col : cols)
-        {
-            //show columns command will return these data
-            Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
-            ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
-            spiCols.add(spiCol);
-        }
-
-        List<CarbonMeasure> mcols = cb.getMeasureByTableName(tableName.getTableName());
-        for(CarbonMeasure mcol : mcols)
-        {
-            Type spiType = CarbondataType2SpiMapper(mcol.getColumnSchema().getDataType());
-            ColumnMetadata spiCol = new ColumnMetadata(mcol.getColumnSchema().getColumnName(), spiType);
-            spiCols.add(spiCol);
-        }
-
-        //\u5c01\u88c5carbonTable
-        return new ConnectorTableMetadata(tableName, spiCols);
+    return columns.build();
+  }
+
+  //if prefix is null. return all tables
+  //if prefix is not null, just return this table
+  private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix) {
+    if (prefix.getSchemaName() == null) {
+      return listTables(session, prefix.getSchemaName());
     }
+    return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+  }
 
-    @Override
-    public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) {
-
-        CarbondataTableHandle handle = checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
-        checkArgument(handle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");
-
-        String schemaName = handle.getSchemaTableName().getSchemaName();
-        if (!listSchemaNamesInternal().contains(schemaName)) {
-            throw new SchemaNotFoundException(schemaName);
-        }
-
-        //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
-        CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
-        if (cb == null) {
-            throw new TableNotFoundException(handle.getSchemaTableName());
-        }
-
-        ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
-        int index = 0;
-        String tableName = handle.getSchemaTableName().getTableName();
-        for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
-            ColumnSchema cs = column.getColumnSchema();
-
-            int complex = column.getComplexTypeOrdinal();
-            column.getNumberOfChild();
-            column.getListOfChildDimensions();
-
-            Type spiType = CarbondataType2SpiMapper(cs.getDataType());
-            columnHandles.put(
-                    cs.getColumnName(),
-                    new CarbondataColumnHandle(
-                            connectorId,
-                            cs.getColumnName(),
-                            spiType,
-                            index,
-                            column.getKeyOrdinal(),
-                            column.getColumnGroupOrdinal(),
-                            false,
-                            cs.getColumnGroupId(),
-                            cs.getColumnUniqueId(),
-                            cs.isUseInvertedIndex()));
-            index++;
-        }
+  private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
+    if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+      return null;
+    }
 
-        for(CarbonMeasure measure : cb.getMeasureByTableName(tableName)){
-            ColumnSchema cs = measure.getColumnSchema();
-
-            Type spiType = CarbondataType2SpiMapper(cs.getDataType());
-            columnHandles.put(
-                    cs.getColumnName(),
-                    new CarbondataColumnHandle(
-                            connectorId,
-                            cs.getColumnName(),
-                            spiType,
-                            index,
-                            measure.getOrdinal(),
-                            cs.getColumnGroupId(),
-                            true,
-                            cs.getColumnGroupId(),
-                            cs.getColumnUniqueId(),
-                            cs.isUseInvertedIndex()));
-            index++;
-        }
+    CarbonTable cb = carbonTableReader.getTable(tableName);
+    if (cb == null) {
+      return null;
+    }
 
-        //should i cache it?
-        columnHandleMap = columnHandles.build();
+    List<ColumnMetadata> spiCols = new LinkedList<>();
+    List<CarbonDimension> cols = cb.getDimensionByTableName(tableName.getTableName());
+    for (CarbonDimension col : cols) {
+      //show columns command will return these data
+      Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+      ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
+      spiCols.add(spiCol);
+    }
 
-        return columnHandleMap;
+    List<CarbonMeasure> mcols = cb.getMeasureByTableName(tableName.getTableName());
+    for (CarbonMeasure mcol : mcols) {
+      Type spiType = CarbondataType2SpiMapper(mcol.getColumnSchema().getDataType());
+      ColumnMetadata spiCol = new ColumnMetadata(mcol.getColumnSchema().getColumnName(), spiType);
+      spiCols.add(spiCol);
     }
 
-    @Override
-    public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+    //\u5c01\u88c5carbonTable
+    return new ConnectorTableMetadata(tableName, spiCols);
+  }
 
-        checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
-        return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle").getColumnMetadata();
-    }
+  @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
+      ConnectorTableHandle tableHandle) {
 
-    @Override
-    public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
-        //check tablename is valid
-        //schema is exist
-        //tables is exist
+    CarbondataTableHandle handle =
+        checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+    checkArgument(handle.getConnectorId().equals(connectorId),
+        "tableHandle is not for this connector");
 
-        //CarbondataTable  get from jar
-        return new CarbondataTableHandle(connectorId, tableName);
+    String schemaName = handle.getSchemaTableName().getSchemaName();
+    if (!listSchemaNamesInternal().contains(schemaName)) {
+      throw new SchemaNotFoundException(schemaName);
     }
 
-    @Override
-    public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) {
-        CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
-        ConnectorTableLayout layout = new ConnectorTableLayout(new CarbondataTableLayoutHandle(handle,constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
-        return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+    //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
+    CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
+    if (cb == null) {
+      throw new TableNotFoundException(handle.getSchemaTableName());
     }
 
-    @Override
-    public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) {
-        return new ConnectorTableLayout(handle);
+    ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
+    int index = 0;
+    String tableName = handle.getSchemaTableName().getTableName();
+    for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
+      ColumnSchema cs = column.getColumnSchema();
+
+      int complex = column.getComplexTypeOrdinal();
+      column.getNumberOfChild();
+      column.getListOfChildDimensions();
+
+      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      columnHandles.put(cs.getColumnName(),
+          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, index,
+              column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+      index++;
     }
 
-    @Override
-    public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) {
-        return getTableMetadataInternal(table);
-    }
+    for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
+      ColumnSchema cs = measure.getColumnSchema();
 
-    public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table)
-    {
-        CarbondataTableHandle carbondataTableHandle = checkType(table, CarbondataTableHandle.class, "table");
-        checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");
-        return getTableMetadata(carbondataTableHandle.getSchemaTableName());
+      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      columnHandles.put(cs.getColumnName(),
+          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, index,
+              measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+      index++;
     }
 
-
-    public static Type CarbondataType2SpiMapper(DataType colType)
-    {
-        switch (colType)
-        {
-            case BOOLEAN:
-                return BooleanType.BOOLEAN;
-            case SHORT:
-                return SmallintType.SMALLINT;
-            case INT:
-                return IntegerType.INTEGER;
-            case LONG:
-                return BigintType.BIGINT;
-            case FLOAT:
-            case DOUBLE:
-                return DoubleType.DOUBLE;
-
-            case DECIMAL:
-                return DecimalType.createDecimalType();
-            case STRING:
-                return VarcharType.VARCHAR;
-            case DATE:
-                return DateType.DATE;
-            case TIMESTAMP:
-                return TimestampType.TIMESTAMP;
+    //should i cache it?
+    columnHandleMap = columnHandles.build();
+
+    return columnHandleMap;
+  }
+
+  @Override public ColumnMetadata getColumnMetadata(ConnectorSession session,
+      ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+
+    checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+    return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle")
+        .getColumnMetadata();
+  }
+
+  @Override
+  public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+    //check tablename is valid
+    //schema is exist
+    //tables is exist
+
+    //CarbondataTable  get from jar
+    return new CarbondataTableHandle(connectorId, tableName);
+  }
+
+  @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
+      ConnectorTableHandle table, Constraint<ColumnHandle> constraint,
+      Optional<Set<ColumnHandle>> desiredColumns) {
+    CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
+    ConnectorTableLayout layout = new ConnectorTableLayout(
+        new CarbondataTableLayoutHandle(handle, constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
+    return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+  }
+
+  @Override public ConnectorTableLayout getTableLayout(ConnectorSession session,
+      ConnectorTableLayoutHandle handle) {
+    return new ConnectorTableLayout(handle);
+  }
+
+  @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session,
+      ConnectorTableHandle table) {
+    return getTableMetadataInternal(table);
+  }
+
+  public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table) {
+    CarbondataTableHandle carbondataTableHandle =
+        checkType(table, CarbondataTableHandle.class, "table");
+    checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId),
+        "tableHandle is not for this connector");
+    return getTableMetadata(carbondataTableHandle.getSchemaTableName());
+  }
+
+  public static Type CarbondataType2SpiMapper(DataType colType) {
+    switch (colType) {
+      case BOOLEAN:
+        return BooleanType.BOOLEAN;
+      case SHORT:
+        return SmallintType.SMALLINT;
+      case INT:
+        return IntegerType.INTEGER;
+      case LONG:
+        return BigintType.BIGINT;
+      case FLOAT:
+      case DOUBLE:
+        return DoubleType.DOUBLE;
+
+      case DECIMAL:
+        return DecimalType.createDecimalType();
+      case STRING:
+        return VarcharType.VARCHAR;
+      case DATE:
+        return DateType.DATE;
+      case TIMESTAMP:
+        return TimestampType.TIMESTAMP;
 
             /*case DataType.MAP:
             case DataType.ARRAY:
             case DataType.STRUCT:
             case DataType.NULL:*/
 
-            default:
-                return VarcharType.VARCHAR;
-        }
+      default:
+        return VarcharType.VARCHAR;
     }
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
index b329678..0baa64a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -38,46 +38,39 @@ import static java.util.Objects.requireNonNull;
 
 public class CarbondataModule implements Module {
 
-    private final String connectorId;
-    private final TypeManager typeManager;
+  private final String connectorId;
+  private final TypeManager typeManager;
 
-    public CarbondataModule(String connectorId, TypeManager typeManager)
-    {
-        this.connectorId = requireNonNull(connectorId, "connector id is null");
-        this.typeManager = requireNonNull(typeManager, "typeManager is null");
-    }
+  public CarbondataModule(String connectorId, TypeManager typeManager) {
+    this.connectorId = requireNonNull(connectorId, "connector id is null");
+    this.typeManager = requireNonNull(typeManager, "typeManager is null");
+  }
 
-    @Override
-    public void configure(Binder binder) {
-        binder.bind(TypeManager.class).toInstance(typeManager);
+  @Override public void configure(Binder binder) {
+    binder.bind(TypeManager.class).toInstance(typeManager);
 
-        binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
-        binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
-        binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
-        binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
-        binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
-        binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
-        configBinder(binder).bindConfig(CarbonTableConfig.class);
-    }
+    binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
+    binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
+    binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+    binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+    binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
+    configBinder(binder).bindConfig(CarbonTableConfig.class);
+  }
 
-    public static final class TypeDeserializer
-            extends FromStringDeserializer<Type>
-    {
-        private final TypeManager typeManager;
+  public static final class TypeDeserializer extends FromStringDeserializer<Type> {
+    private final TypeManager typeManager;
 
-        @Inject
-        public TypeDeserializer(TypeManager typeManager)
-        {
-            super(Type.class);
-            this.typeManager = requireNonNull(typeManager, "typeManager is null");
-        }
+    @Inject public TypeDeserializer(TypeManager typeManager) {
+      super(Type.class);
+      this.typeManager = requireNonNull(typeManager, "typeManager is null");
+    }
 
-        @Override
-        protected Type _deserialize(String value, DeserializationContext context)
-        {
-            Type type = typeManager.getType(parseTypeSignature(value));
-            checkArgument(type != null, "Unknown type %s", value);
-            return type;
-        }
+    @Override protected Type _deserialize(String value, DeserializationContext context) {
+      Type type = typeManager.getType(parseTypeSignature(value));
+      checkArgument(type != null, "Unknown type %s", value);
+      return type;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java
index 3bcfe4f..191f13b 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPlugin.java
@@ -24,13 +24,11 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 public class CarbondataPlugin implements Plugin {
 
-    @Override
-    public Iterable<ConnectorFactory> getConnectorFactories()
-    {
-        return ImmutableList.of(new CarbondataConnectorFactory("carbondata", getClassLoader()));
-    }
+  @Override public Iterable<ConnectorFactory> getConnectorFactories() {
+    return ImmutableList.of(new CarbondataConnectorFactory("carbondata", getClassLoader()));
+  }
 
-    private static ClassLoader getClassLoader() {
-        return FileFactory.class.getClassLoader();
-    }
+  private static ClassLoader getClassLoader() {
+    return FileFactory.class.getClassLoader();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index 7d65efd..3314ac4 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -38,119 +38,107 @@ import static com.google.common.base.Preconditions.checkState;
 
 public class CarbondataRecordCursor implements RecordCursor {
 
-    private static final Logger log = Logger.get(CarbondataRecordCursor.class);
-    private final List<CarbondataColumnHandle> columnHandles;
-
-    private List<String> fields;
-    private CarbondataSplit split;
-    private CarbonIterator<Object[]> rowCursor;
-    private CarbonReadSupport<Object[]> readSupport;
-
-    private long totalBytes;
-    private long nanoStart;
-    private long nanoEnd;
-
-    public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport, CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles, CarbondataSplit split) {
-        this.rowCursor = carbonIterator;
-        this.columnHandles = columnHandles;
-        this.readSupport = readSupport;
-        this.totalBytes = 0;
-    }
-
-
-    @Override
-    public long getTotalBytes() {
-        return totalBytes;
-    }
-
-    @Override
-    public long getCompletedBytes() {
-        return totalBytes;
-    }
-
-    @Override
-    public long getReadTimeNanos() {
-        return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
-    }
-
-    @Override
-    public Type getType(int field) {
-
-        checkArgument(field < columnHandles.size(), "Invalid field index");
-        return columnHandles.get(field).getColumnType();
-    }
+  private static final Logger log = Logger.get(CarbondataRecordCursor.class);
+  private final List<CarbondataColumnHandle> columnHandles;
 
-    @Override
-    public boolean advanceNextPosition() {
+  private List<String> fields;
+  private CarbondataSplit split;
+  private CarbonIterator<Object[]> rowCursor;
+  private CarbonReadSupport<Object[]> readSupport;
 
-        if (nanoStart == 0) {
-            nanoStart = System.nanoTime();
-        }
+  private long totalBytes;
+  private long nanoStart;
+  private long nanoEnd;
 
-        if(rowCursor.hasNext())
-        {
-            fields = Stream.of(readSupport.readRow(rowCursor.next())).map(a -> a.toString()).collect(Collectors.toList());
+  public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport,
+      CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles,
+      CarbondataSplit split) {
+    this.rowCursor = carbonIterator;
+    this.columnHandles = columnHandles;
+    this.readSupport = readSupport;
+    this.totalBytes = 0;
+  }
 
-            totalBytes += fields.size();
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public boolean getBoolean(int field) {
-        checkFieldType(field, BOOLEAN);
-        return Boolean.parseBoolean(getFieldValue(field));
-    }
+  @Override public long getTotalBytes() {
+    return totalBytes;
+  }
 
-    @Override
-    public long getLong(int field) {
-        String timeStr = getFieldValue(field);
-        Long milliSec = 0L;
+  @Override public long getCompletedBytes() {
+    return totalBytes;
+  }
 
-        //suppose the
-        return Math.round(Double.parseDouble(getFieldValue(field)));
-    }
+  @Override public long getReadTimeNanos() {
+    return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
+  }
 
-    @Override
-    public double getDouble(int field) {
-        checkFieldType(field, DOUBLE);
-        return Double.parseDouble(getFieldValue(field));
-    }
+  @Override public Type getType(int field) {
 
-    @Override
-    public Slice getSlice(int field) {
-        checkFieldType(field, VARCHAR);
-        return Slices.utf8Slice(getFieldValue(field));
-    }
+    checkArgument(field < columnHandles.size(), "Invalid field index");
+    return columnHandles.get(field).getColumnType();
+  }
 
-    @Override
-    public Object getObject(int field) {
-        return null;
-    }
-
-    @Override
-    public boolean isNull(int field) {
-        checkArgument(field < columnHandles.size(), "Invalid field index");
-        return Strings.isNullOrEmpty(getFieldValue(field));
-    }
-
-    String getFieldValue(int field)
-    {
-        checkState(fields != null, "Cursor has not been advanced yet");
-        return fields.get(field);
-    }
+  @Override public boolean advanceNextPosition() {
 
-    private void checkFieldType(int field, Type expected)
-    {
-        Type actual = getType(field);
-        checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field, expected, actual);
+    if (nanoStart == 0) {
+      nanoStart = System.nanoTime();
     }
 
-    @Override
-    public void close() {
-        nanoEnd = System.nanoTime();
+    if (rowCursor.hasNext()) {
+      fields = Stream.of(readSupport.readRow(rowCursor.next())).map(a -> a.toString())
+          .collect(Collectors.toList());
 
-        //todo  delete cache from readSupport
+      totalBytes += fields.size();
+      return true;
     }
+    return false;
+  }
+
+  @Override public boolean getBoolean(int field) {
+    checkFieldType(field, BOOLEAN);
+    return Boolean.parseBoolean(getFieldValue(field));
+  }
+
+  @Override public long getLong(int field) {
+    String timeStr = getFieldValue(field);
+    Long milliSec = 0L;
+
+    //suppose the
+    return Math.round(Double.parseDouble(getFieldValue(field)));
+  }
+
+  @Override public double getDouble(int field) {
+    checkFieldType(field, DOUBLE);
+    return Double.parseDouble(getFieldValue(field));
+  }
+
+  @Override public Slice getSlice(int field) {
+    checkFieldType(field, VARCHAR);
+    return Slices.utf8Slice(getFieldValue(field));
+  }
+
+  @Override public Object getObject(int field) {
+    return null;
+  }
+
+  @Override public boolean isNull(int field) {
+    checkArgument(field < columnHandles.size(), "Invalid field index");
+    return Strings.isNullOrEmpty(getFieldValue(field));
+  }
+
+  String getFieldValue(int field) {
+    checkState(fields != null, "Cursor has not been advanced yet");
+    return fields.get(field);
+  }
+
+  private void checkFieldType(int field, Type expected) {
+    Type actual = getType(field);
+    checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field,
+        expected, actual);
+  }
+
+  @Override public void close() {
+    nanoEnd = System.nanoTime();
+
+    //todo  delete cache from readSupport
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/27123300/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index 737d2f1..af37728 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -43,74 +43,68 @@ import static org.apache.carbondata.presto.Types.checkType;
 
 public class CarbondataRecordSet implements RecordSet {
 
-    private CarbonTable carbonTable;
-    private TupleDomain<ColumnHandle> originalConstraint;
-    private Expression carbonConstraint;
-    private List<CarbondataColumnConstraint> rebuildConstraints;
-    private QueryModel queryModel;
-    private CarbondataSplit split;
-    private List<CarbondataColumnHandle> columns;
-    private QueryExecutor queryExecutor;
+  private CarbonTable carbonTable;
+  private TupleDomain<ColumnHandle> originalConstraint;
+  private Expression carbonConstraint;
+  private List<CarbondataColumnConstraint> rebuildConstraints;
+  private QueryModel queryModel;
+  private CarbondataSplit split;
+  private List<CarbondataColumnHandle> columns;
+  private QueryExecutor queryExecutor;
 
-    private CarbonReadSupport<Object[]> readSupport;
+  private CarbonReadSupport<Object[]> readSupport;
 
-    public CarbondataRecordSet(
-            CarbonTable carbonTable,
-            ConnectorSession session,
-            ConnectorSplit split,
-            List<CarbondataColumnHandle> columns,
-            QueryModel queryModel){
-        this.carbonTable = carbonTable;
-        this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
-        this.originalConstraint = this.split.getConstraints();
-        this.rebuildConstraints = this.split.getRebuildConstraints();
-        this.queryModel = queryModel;
-        this.columns = columns;
-        this.readSupport = new DictionaryDecodeReadSupport();
-    }
+  public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
+      ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) {
+    this.carbonTable = carbonTable;
+    this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
+    this.originalConstraint = this.split.getConstraints();
+    this.rebuildConstraints = this.split.getRebuildConstraints();
+    this.queryModel = queryModel;
+    this.columns = columns;
+    this.readSupport = new DictionaryDecodeReadSupport();
+  }
 
-    //todo support later
-    private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
-        return null;
-    }
+  //todo support later
+  private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
+    return null;
+  }
 
-    @Override
-    public List<Type> getColumnTypes() {
-        return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
-    }
+  @Override public List<Type> getColumnTypes() {
+    return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
+  }
 
-    @Override
-    public RecordCursor cursor() {
-        List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+  @Override public RecordCursor cursor() {
+    List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
 
-        //tableBlockInfoList.add(split.getLocalInputSplit().getTableBlockInfo());
+    //tableBlockInfoList.add(split.getLocalInputSplit().getTableBlockInfo());
         /*BlockletInfos blockletInfos = new BlockletInfos(split.getLocalInputSplit().getNumberOfBlocklets(), 0,
                 split.getLocalInputSplit().getNumberOfBlocklets());*/
-        tableBlockInfoList.add(
-                new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
-                        split.getLocalInputSplit().getStart(),
-                        split.getLocalInputSplit().getSegmentId(),
-                        split.getLocalInputSplit().getLocations().toArray(new String[0]),
-                        split.getLocalInputSplit().getLength(),
-                        //blockletInfos,
-                        ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion())));
-        queryModel.setTableBlockInfos(tableBlockInfoList);
+    tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
+        split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(),
+        split.getLocalInputSplit().getLocations().toArray(new String[0]),
+        split.getLocalInputSplit().getLength(),
+        //blockletInfos,
+        ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion())));
+    queryModel.setTableBlockInfos(tableBlockInfoList);
 
-        queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
 
-        //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
-        try {
-            readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
-            CarbonIterator<Object[]> carbonIterator = new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
-            RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
-            return rc;
-        } catch (QueryExecutionException e) {
-            //throw new InterruptedException(e.getMessage());
-            System.out.println(e.getMessage());
-        } catch(Exception ex) {
-            System.out.println(ex.toString());
-        }
-        return null;
+    //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
+    try {
+      readSupport
+          .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
+      CarbonIterator<Object[]> carbonIterator =
+          new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+      RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
+      return rc;
+    } catch (QueryExecutionException e) {
+      //throw new InterruptedException(e.getMessage());
+      System.out.println(e.getMessage());
+    } catch (Exception ex) {
+      System.out.println(ex.toString());
     }
+    return null;
+  }
 }