You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2018/06/20 14:54:06 UTC

carbondata git commit: [HOTFIX] Added Performance Optimization for Presto by using MultiBlockSplit

Repository: carbondata
Updated Branches:
  refs/heads/master 0e1d550e8 -> 01b48fc36


[HOTFIX] Added Performance Optimization for Presto by using MultiBlockSplit

Added Performance Optimization for Presto by using MultiBlockSplit

This closes #2265


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/01b48fc3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/01b48fc3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/01b48fc3

Branch: refs/heads/master
Commit: 01b48fc36a93b74827edff4d3330cbf5546a5d38
Parents: 0e1d550
Author: Bhavya <bh...@knoldus.com>
Authored: Mon Apr 16 11:54:17 2018 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Wed Jun 20 22:53:38 2018 +0800

----------------------------------------------------------------------
 integration/presto/README.md                    |   9 +-
 integration/presto/pom.xml                      |  14 +-
 .../carbondata/presto/CarbonVectorBatch.java    |   2 +-
 .../carbondata/presto/CarbondataMetadata.java   |  87 +++++++-----
 .../carbondata/presto/CarbondataPageSource.java |  11 +-
 .../presto/CarbondataPageSourceProvider.java    |  27 ++--
 .../carbondata/presto/CarbondataSplit.java      |  24 +++-
 .../presto/CarbondataSplitManager.java          |  34 ++++-
 .../PrestoCarbonVectorizedRecordReader.java     |  39 +++++-
 .../presto/impl/CarbonLocalInputSplit.java      |  19 ++-
 .../presto/impl/CarbonLocalMultiBlockSplit.java |  86 ++++++++++++
 .../presto/impl/CarbonTableConfig.java          |  40 +++++-
 .../presto/impl/CarbonTableReader.java          | 133 ++++++++++++-------
 .../readers/DecimalSliceStreamReader.java       | 105 ++++++++-------
 .../presto/src/main/resources/log4j.properties  |  11 ++
 15 files changed, 462 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/README.md
----------------------------------------------------------------------
diff --git a/integration/presto/README.md b/integration/presto/README.md
index 53884a2..5a44f5c 100644
--- a/integration/presto/README.md
+++ b/integration/presto/README.md
@@ -77,6 +77,9 @@ Please follow the below steps to query carbondata in presto
   carbondata-store={schema-store-path}
   enable.unsafe.in.query.processing=false
   carbon.unsafe.working.memory.in.mb={value}
+  enable.unsafe.columnpage=false
+  enable.unsafe.sort=false
+
   ```
   Replace the schema-store-path with the absolute path of the parent directory of the schema.
   For example, if you have a schema named 'default' stored in hdfs://namenode:9000/test/carbondata/,
@@ -112,7 +115,11 @@ Please follow the below steps to query carbondata in presto
 ####  Unsafe Properties    
   enable.unsafe.in.query.processing property by default is true in CarbonData system, the carbon.unsafe.working.memory.in.mb 
   property defines the limit for Unsafe Memory usage in Mega Bytes, the default value is 512 MB.
-  If your tables are big you can increase the unsafe memory, or disable unsafe via setting enable.unsafe.in.query.processing=false.
+  Currently Presto does not support Unsafe Memory so we have to disable the unsafe feature by setting below properties to false.
+
+  enable.unsafe.in.query.processing=false.
+  enable.unsafe.columnpage=false
+  enable.unsafe.sort=false
 
   If you updated the jar balls or configuration files, make sure you have dispatched them
    to all the presto nodes and restarted the presto servers on the nodes. The updates will not take effect before restarting.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index b91f070..c61023a 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -463,12 +463,6 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-      <version>3.0.2</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
       <groupId>org.glassfish.hk2</groupId>
       <artifactId>hk2-api</artifactId>
       <version>2.5.0-b42</version>
@@ -552,13 +546,7 @@
     <testSourceDirectory>src/test/scala</testSourceDirectory>
     <resources>
       <resource>
-        <directory>src/resources</directory>
-      </resource>
-      <resource>
-        <directory>.</directory>
-        <includes>
-          <include>CARBON_SPARK_INTERFACELogResource.properties</include>
-        </includes>
+        <directory>src/main/resources</directory>
       </resource>
     </resources>
     <plugins>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
index b230d6a..b6caaa3 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl
 
 public class CarbonVectorBatch {
 
-  private static final int DEFAULT_BATCH_SIZE = 1024;
+  private static final int DEFAULT_BATCH_SIZE =  4 * 1024;
 
   private final StructField[] schema;
   private final int capacity;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index 8be7494..256e405 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -17,32 +17,51 @@
 
 package org.apache.carbondata.presto;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.presto.impl.CarbonTableConfig;
-import org.apache.carbondata.presto.impl.CarbonTableReader;
-import com.facebook.presto.spi.*;
-import com.facebook.presto.spi.connector.ConnectorMetadata;
-import com.facebook.presto.spi.type.*;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import javax.inject.Inject;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
 
-import javax.inject.Inject;
-import java.util.*;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayout;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.SchemaNotFoundException;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.type.BigintType;
+import com.facebook.presto.spi.type.BooleanType;
+import com.facebook.presto.spi.type.DateType;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.DoubleType;
+import com.facebook.presto.spi.type.IntegerType;
+import com.facebook.presto.spi.type.SmallintType;
+import com.facebook.presto.spi.type.TimestampType;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.VarcharType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
-import static org.apache.carbondata.presto.Types.checkType;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
-import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
-import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
-import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+import static org.apache.carbondata.presto.Types.checkType;
 
 public class CarbondataMetadata implements ConnectorMetadata {
   private final String connectorId;
@@ -119,8 +138,13 @@ public class CarbondataMetadata implements ConnectorMetadata {
     List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
     for (CarbonColumn col : carbonColumns) {
       //show columns command will return these data
-      Type columnType = carbonDataType2SpiMapper(col.getColumnSchema());
-      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(), columnType);
+      ColumnSchema columnSchema = col.getColumnSchema();
+      Type columnType = carbonDataType2SpiMapper(columnSchema);
+      String extraValues =
+          columnSchema.getEncodingList().stream().map(encoding -> encoding.toString() + " ")
+              .reduce("", String::concat);
+      ColumnMetadata columnMeta =
+          new ColumnMetadata(columnSchema.getColumnName(), columnType, "", extraValues, false);
       columnsMetaList.add(columnMeta);
     }
 
@@ -152,22 +176,22 @@ public class CarbondataMetadata implements ConnectorMetadata {
 
       Type spiType = carbonDataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
-          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, column.getSchemaOrdinal(),
-              column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
+          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType,
+              column.getSchemaOrdinal(), column.getKeyOrdinal(), column.getColumnGroupOrdinal(),
+              false, cs.getColumnGroupId(), cs.getColumnUniqueId(), cs.isUseInvertedIndex(),
+              cs.getPrecision(), cs.getScale()));
     }
 
     for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
       ColumnSchema cs = measure.getColumnSchema();
-
       Type spiType = carbonDataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
-          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, cs.getSchemaOrdinal(),
-              measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
+          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType,
+              cs.getSchemaOrdinal(), measure.getOrdinal(), cs.getColumnGroupId(), true,
+              cs.getColumnGroupId(), cs.getColumnUniqueId(), cs.isUseInvertedIndex(),
+              cs.getPrecision(), cs.getScale()));
     }
 
-    //should i cache it?
     columnHandleMap = columnHandles.build();
 
     return columnHandleMap;
@@ -183,12 +207,7 @@ public class CarbondataMetadata implements ConnectorMetadata {
 
   @Override
   public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
-    //check tablename is valid
-    //schema is exist
-    //tables is exist
-
-    //CarbondataTable  get from jar
-    return new CarbondataTableHandle(connectorId, tableName);
+      return new CarbondataTableHandle(connectorId, tableName);
   }
 
   @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
@@ -196,7 +215,7 @@ public class CarbondataMetadata implements ConnectorMetadata {
       Optional<Set<ColumnHandle>> desiredColumns) {
     CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
     ConnectorTableLayout layout = new ConnectorTableLayout(
-        new CarbondataTableLayoutHandle(handle, constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
+        new CarbondataTableLayoutHandle(handle, constraint.getSummary()));
     return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index a7682ce..d31010f 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.presto;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -32,17 +31,13 @@ import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables;
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorPageSource;
 import com.facebook.presto.spi.Page;
-import com.facebook.presto.spi.PageBuilder;
 import com.facebook.presto.spi.PrestoException;
-import com.facebook.presto.spi.RecordCursor;
-import com.facebook.presto.spi.RecordSet;
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.LazyBlock;
 import com.facebook.presto.spi.block.LazyBlockLoader;
 import com.facebook.presto.spi.type.Type;
 
 import static com.google.common.base.Preconditions.checkState;
-import static java.util.Collections.unmodifiableList;
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -53,7 +48,6 @@ class CarbondataPageSource implements ConnectorPageSource {
   private static final LogService logger =
       LogServiceFactory.getLogService(CarbondataPageSource.class.getName());
   private final List<Type> types;
-  private final PageBuilder pageBuilder;
   private boolean closed;
   private PrestoCarbonVectorizedRecordReader vectorReader;
   private CarbonDictionaryDecodeReadSupport<Object[]> readSupport;
@@ -69,7 +63,6 @@ class CarbondataPageSource implements ConnectorPageSource {
       List<ColumnHandle> columnHandles ) {
     this.columnHandles = columnHandles;
     this.types = getColumnTypes();
-    this.pageBuilder = new PageBuilder(this.types);
     this.readSupport = readSupport;
     vectorReader = vectorizedRecordReader;
     this.readers = createStreamReaders();
@@ -84,7 +77,7 @@ class CarbondataPageSource implements ConnectorPageSource {
   }
 
   @Override public boolean isFinished() {
-    return closed && pageBuilder.isEmpty();
+    return closed ;
   }
 
 
@@ -124,7 +117,6 @@ class CarbondataPageSource implements ConnectorPageSource {
         blocks[column] = new LazyBlock(batchSize, new CarbondataBlockLoader(column, type));
       }
       Page page = new Page(batchSize, blocks);
-      sizeOfData += columnarBatch.capacity();
       return page;
     }
     catch (PrestoException e) {
@@ -197,6 +189,7 @@ class CarbondataPageSource implements ConnectorPageSource {
       checkState(batchId == expectedBatchId);
       try {
         Block block = readers[columnIndex].readBlock(type);
+        sizeOfData += block.getSizeInBytes();
         lazyBlock.setBlock(block);
       }
       catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index a268549..4679eac 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.presto;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
@@ -31,10 +30,12 @@ import org.apache.carbondata.core.scan.executor.exception.QueryExecutionExceptio
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
+import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
 import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 
@@ -64,17 +65,18 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
 
   private String connectorId;
   private CarbonTableReader carbonTableReader;
+  private String queryId ;
 
   @Inject public CarbondataPageSourceProvider(CarbondataConnectorId connectorId,
       CarbonTableReader carbonTableReader) {
     this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
     this.carbonTableReader = requireNonNull(carbonTableReader, "carbonTableReader is null");
-
   }
 
   @Override
   public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle,
       ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
+    this.queryId = ((CarbondataSplit)split).getQueryId();
     CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport();
     PrestoCarbonVectorizedRecordReader carbonRecordReader = createReader(split, columns, readSupport);
     return new CarbondataPageSource(readSupport, carbonRecordReader, columns );
@@ -100,8 +102,10 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
     try {
       CarbonIterator iterator = queryExecutor.execute(queryModel);
       readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
-      return new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
+      PrestoCarbonVectorizedRecordReader reader = new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
           (AbstractDetailQueryResultIterator) iterator);
+      reader.setTaskId(carbondataSplit.getIndex());
+      return reader;
     } catch (IOException e) {
       throw new RuntimeException("Unable to get the Query Model ", e);
     } catch (QueryExecutionException e) {
@@ -129,23 +133,28 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
       String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
 
       conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+      conf.set("query.id", queryId);
       JobConf jobConf = new JobConf(conf);
       CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable,
           PrestoFilterUtil.parseFilterExpression(carbondataSplit.getConstraints()),
           carbonProjection);
       TaskAttemptContextImpl hadoopAttemptContext =
           new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
-      CarbonInputSplit carbonInputSplit =
-          CarbonLocalInputSplit.convertSplit(carbondataSplit.getLocalInputSplit());
+      CarbonMultiBlockSplit carbonInputSplit =
+          CarbonLocalMultiBlockSplit.convertSplit(carbondataSplit.getLocalInputSplit());
       QueryModel queryModel =
           carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
+      queryModel.setQueryId(queryId);
       queryModel.setVectorReader(true);
+      queryModel.setStatisticsRecorder(
+          CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId()));
 
-      List<CarbonInputSplit> splitList = new ArrayList<>(1);
-      splitList.add(carbonInputSplit);
-      List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+      List<TableBlockInfo> tableBlockInfoList =
+          CarbonInputSplit.createBlocks(carbonInputSplit.getAllSplits());
       queryModel.setTableBlockInfos(tableBlockInfoList);
 
+
+
       return queryModel;
     } catch (IOException e) {
       throw new RuntimeException("Unable to get the Query Model ", e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
index ecc41ef..8a3446b 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
@@ -18,6 +18,8 @@
 package org.apache.carbondata.presto;
 
 import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
+import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSplit;
 import com.facebook.presto.spi.HostAddress;
@@ -36,21 +38,27 @@ public class CarbondataSplit implements ConnectorSplit {
   private final String connectorId;
   private final SchemaTableName schemaTableName;
   private final TupleDomain<ColumnHandle> constraints;
-  private final CarbonLocalInputSplit localInputSplit;
+  private final CarbonLocalMultiBlockSplit localInputSplit;
   private final List<CarbondataColumnConstraint> rebuildConstraints;
   private final ImmutableList<HostAddress> addresses;
+  private final String queryId;
+  private final long index;
 
   @JsonCreator public CarbondataSplit(@JsonProperty("connectorId") String connectorId,
       @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
       @JsonProperty("constraints") TupleDomain<ColumnHandle> constraints,
-      @JsonProperty("localInputSplit") CarbonLocalInputSplit localInputSplit,
-      @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints) {
+      @JsonProperty("localInputSplit") CarbonLocalMultiBlockSplit localInputSplit,
+      @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints,
+      @JsonProperty("queryId") String queryId,
+      @JsonProperty("index") long index) {
     this.connectorId = requireNonNull(connectorId, "connectorId is null");
     this.schemaTableName = requireNonNull(schemaTableName, "schemaTable is null");
     this.constraints = requireNonNull(constraints, "constraints is null");
     this.localInputSplit = requireNonNull(localInputSplit, "localInputSplit is null");
     this.rebuildConstraints = requireNonNull(rebuildConstraints, "rebuildConstraints is null");
     this.addresses = ImmutableList.of();
+    this.queryId = queryId;
+    this.index = index;
   }
 
   @JsonProperty public String getConnectorId() {
@@ -65,7 +73,7 @@ public class CarbondataSplit implements ConnectorSplit {
     return constraints;
   }
 
-  @JsonProperty public CarbonLocalInputSplit getLocalInputSplit() {
+  @JsonProperty public CarbonLocalMultiBlockSplit getLocalInputSplit() {
     return localInputSplit;
   }
 
@@ -84,5 +92,13 @@ public class CarbondataSplit implements ConnectorSplit {
   @Override public Object getInfo() {
     return this;
   }
+
+  @JsonProperty public String getQueryId() {
+    return queryId;
+  }
+
+  @JsonProperty public long getIndex() {
+    return index;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index 3a54b22..13abd13 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -22,7 +22,11 @@ import java.util.List;
 import java.util.Optional;
 
 import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
 import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 
@@ -62,7 +66,16 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
     CarbondataTableHandle tableHandle = layoutHandle.getTable();
     SchemaTableName key = tableHandle.getSchemaTableName();
 
-    // Packaging presto-TupleDomain into CarbondataColumnConstraint, to decouple from presto-spi Module
+    String queryId = System.nanoTime() + "";
+    QueryStatistic statistic = new QueryStatistic();
+    QueryStatisticsRecorder statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+    statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis());
+    statisticRecorder.recordStatisticsForDriver(statistic, queryId);
+    statistic = new QueryStatistic();
+
+    carbonTableReader.setQueryId(queryId);
+    // Packaging presto-TupleDomain into CarbondataColumnConstraint,
+    // to decouple from presto-spi Module
     List<CarbondataColumnConstraint> rebuildConstraints =
         getColumnConstraints(layoutHandle.getConstraint());
 
@@ -70,14 +83,23 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
     if (null != cache) {
       Expression filters = PrestoFilterUtil.parseFilterExpression(layoutHandle.getConstraint());
       try {
-        List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters,
-                layoutHandle.getConstraint());
+        List<CarbonLocalMultiBlockSplit> splits =
+            carbonTableReader.getInputSplits2(cache, filters, layoutHandle.getConstraint());
 
         ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
-        for (CarbonLocalInputSplit split : splits) {
+        long index = 0;
+        for (CarbonLocalMultiBlockSplit split : splits) {
+          index++;
           cSplits.add(new CarbondataSplit(connectorId, tableHandle.getSchemaTableName(),
-              layoutHandle.getConstraint(), split, rebuildConstraints));
+              layoutHandle.getConstraint(), split, rebuildConstraints, queryId, index));
         }
+
+        statisticRecorder.logStatisticsAsTableDriver();
+
+        statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
+            System.currentTimeMillis());
+        statisticRecorder.recordStatisticsForDriver(statistic, queryId);
+        statisticRecorder.logStatisticsAsTableDriver();
         return new FixedSplitSource(cSplits.build());
       } catch (Exception ex) {
         throw new RuntimeException(ex.getMessage(), ex);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
index a1907db..913d423 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -39,6 +39,10 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.stats.TaskStatistics;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.AbstractRecordReader;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
@@ -72,11 +76,17 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
 
   private QueryExecutor queryExecutor;
 
-  public PrestoCarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel queryModel, AbstractDetailQueryResultIterator iterator) {
+  private long taskId;
+
+  private long queryStartTime;
+
+  public PrestoCarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel queryModel,
+      AbstractDetailQueryResultIterator iterator) {
     this.queryModel = queryModel;
     this.iterator = iterator;
     this.queryExecutor = queryExecutor;
     enableReturningBatches();
+    this.queryStartTime = System.currentTimeMillis();
   }
 
   /**
@@ -125,6 +135,8 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     } catch (QueryExecutionException e) {
       throw new IOException(e);
     }
+
+    logStatistics(taskId, queryStartTime, queryModel.getStatisticsRecorder());
   }
 
   @Override public boolean nextKeyValue() throws IOException, InterruptedException {
@@ -239,5 +251,30 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     return false;
   }
 
+  public void setTaskId(long taskId) {
+    this.taskId = taskId;
+  }
+
+  /**
+   * For Logging the Statistics
+   * @param taskId
+   * @param queryStartTime
+   * @param recorder
+   */
+  private void  logStatistics(
+      Long taskId,
+      Long queryStartTime,
+      QueryStatisticsRecorder recorder
+  ) {
+    if (null != recorder) {
+      QueryStatistic queryStatistic = new QueryStatistic();
+      queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+          System.currentTimeMillis() - queryStartTime);
+      recorder.recordStatistics(queryStatistic);
+      // print executor query statistics for each task_id
+      TaskStatistics statistics = recorder.statisticsForTask(taskId, queryStartTime);
+      recorder.logStatisticsForTask(statistics);
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index 2c6a810..af23671 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -42,6 +42,7 @@ public class CarbonLocalInputSplit {
   private List<String> locations;// locations are the locations for different replicas.
   private short version;
   private String[] deleteDeltaFiles;
+  private String blockletId;
 
 
   private String detailInfo;
@@ -87,6 +88,10 @@ public class CarbonLocalInputSplit {
     return detailInfo;
   }
 
+  @JsonProperty public String getBlockletId() {
+    return blockletId;
+  }
+
   public void setDetailInfo(BlockletDetailInfo blockletDetailInfo) {
     Gson gson = new Gson();
     detailInfo = gson.toJson(blockletDetailInfo);
@@ -100,6 +105,7 @@ public class CarbonLocalInputSplit {
                                  @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
       @JsonProperty("version") short version,
       @JsonProperty("deleteDeltaFiles") String[] deleteDeltaFiles,
+      @JsonProperty("blockletId") String blockletId,
       @JsonProperty("detailInfo") String detailInfo
   ) {
     this.path = path;
@@ -111,22 +117,23 @@ public class CarbonLocalInputSplit {
     //this.tableBlockInfo = tableBlockInfo;
     this.version = version;
     this.deleteDeltaFiles = deleteDeltaFiles;
+    this.blockletId = blockletId;
     this.detailInfo = detailInfo;
 
   }
 
   public static CarbonInputSplit convertSplit(CarbonLocalInputSplit carbonLocalInputSplit) {
-    CarbonInputSplit inputSplit = new CarbonInputSplit(carbonLocalInputSplit.getSegmentId(), "0",
-        new Path(carbonLocalInputSplit.getPath()), carbonLocalInputSplit.getStart(),
-        carbonLocalInputSplit.getLength(), carbonLocalInputSplit.getLocations()
-        .toArray(new String[carbonLocalInputSplit.getLocations().size()]),
+    CarbonInputSplit inputSplit = new CarbonInputSplit(carbonLocalInputSplit.getSegmentId(),
+        carbonLocalInputSplit.getBlockletId(), new Path(carbonLocalInputSplit.getPath()),
+        carbonLocalInputSplit.getStart(), carbonLocalInputSplit.getLength(),
+        carbonLocalInputSplit.getLocations()
+            .toArray(new String[carbonLocalInputSplit.getLocations().size()]),
         carbonLocalInputSplit.getNumberOfBlocklets(),
         ColumnarFormatVersion.valueOf(carbonLocalInputSplit.getVersion()),
         carbonLocalInputSplit.getDeleteDeltaFiles());
     Gson gson = new Gson();
     BlockletDetailInfo blockletDetailInfo =
         gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class);
-
     if (null == blockletDetailInfo) {
       throw new RuntimeException("Could not read blocklet details");
     }
@@ -138,4 +145,6 @@ public class CarbonLocalInputSplit {
     inputSplit.setDetailInfo(blockletDetailInfo);
     return inputSplit;
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
new file mode 100755
index 0000000..37174c1
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * CarbonLocalInputSplit represents a block, it contains a set of blocklet.
+ */
+public class CarbonLocalMultiBlockSplit {
+
+  private static final long serialVersionUID = 3520344046772190207L;
+
+  /*
+  * Splits (HDFS Blocks) for task to scan.
+  */
+  private List<CarbonLocalInputSplit> splitList;
+
+  /*
+   * The locations of all wrapped splits
+   */
+  private String[] locations;
+
+  private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
+
+  private long length;
+
+  @JsonProperty public long getLength() {
+    return length;
+  }
+
+  @JsonProperty public String[] getLocations() {
+    return locations;
+  }
+
+  @JsonProperty public List<CarbonLocalInputSplit> getSplitList() {
+    return splitList;
+  }
+
+  @JsonProperty public FileFormat getFileFormat() {
+    return fileFormat;
+  }
+
+  @JsonCreator public CarbonLocalMultiBlockSplit(
+      @JsonProperty("splitList") List<CarbonLocalInputSplit> splitList,
+      @JsonProperty("locations") String[] locations) {
+    this.splitList = splitList;
+    this.locations = locations;
+  }
+
+  public static CarbonMultiBlockSplit convertSplit(
+      CarbonLocalMultiBlockSplit carbonLocalMultiBlockSplit) {
+    List<CarbonInputSplit> carbonInputSplitList =
+        carbonLocalMultiBlockSplit.getSplitList().stream().map(CarbonLocalInputSplit::convertSplit)
+            .collect(Collectors.toList());
+
+    CarbonMultiBlockSplit carbonMultiBlockSplit =
+        new CarbonMultiBlockSplit(carbonInputSplitList, carbonLocalMultiBlockSplit.getLocations());
+
+    return carbonMultiBlockSplit;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
index 75a7f11..f800e59 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -17,10 +17,10 @@
 
 package org.apache.carbondata.presto.impl;
 
-import javax.validation.constraints.NotNull;
-
 import io.airlift.configuration.Config;
 
+import javax.validation.constraints.NotNull;
+
 /**
  * Configuration read from etc/catalog/carbondata.properties
  */
@@ -32,6 +32,10 @@ public class CarbonTableConfig {
   private String storePath;
   private String unsafeMemoryInMb;
   private String enableUnsafeInQueryExecution;
+  private String enableUnsafeColumnPage;
+  private String enableUnsafeSort;
+  private String enableQueryStatistics;
+  private String batchSize;
   private String s3A_acesssKey;
   private String s3A_secretKey;
   private String s3_acesssKey;
@@ -88,6 +92,38 @@ public class CarbonTableConfig {
     return this;
   }
 
+  public String getEnableUnsafeColumnPage() { return enableUnsafeColumnPage; }
+
+  @Config("enable.unsafe.columnpage")
+  public CarbonTableConfig setEnableUnsafeColumnPage(String enableUnsafeColumnPage) {
+    this.enableUnsafeColumnPage = enableUnsafeColumnPage;
+    return this;
+  }
+
+  public String getEnableUnsafeSort() { return enableUnsafeSort; }
+
+  @Config("enable.unsafe.sort")
+  public CarbonTableConfig setEnableUnsafeSort(String enableUnsafeSort) {
+    this.enableUnsafeSort = enableUnsafeSort;
+    return this;
+  }
+
+  public String getEnableQueryStatistics() { return enableQueryStatistics; }
+
+  @Config("enable.query.statistics")
+  public CarbonTableConfig setEnableQueryStatistics(String enableQueryStatistics) {
+    this.enableQueryStatistics = enableQueryStatistics;
+    return this;
+  }
+
+  public String getBatchSize() { return batchSize; }
+
+  @Config("query.vector.batchSize")
+  public CarbonTableConfig setBatchSize(String batchSize) {
+    this.batchSize = batchSize;
+    return this;
+  }
+
   public String getS3A_AcesssKey() {
     return s3A_acesssKey;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 5866ad1..77c8ce5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -121,6 +121,8 @@ public class CarbonTableReader {
 
   private LoadMetadataDetails loadMetadataDetails[];
 
+  private String queryId;
+
   /**
    * Logger instance
    */
@@ -137,6 +139,7 @@ public class CarbonTableReader {
     this.carbonCache = new AtomicReference(new HashMap());
     tableList = new ConcurrentSet<>();
     setS3Properties();
+    populateCarbonProperties();
   }
 
   /**
@@ -218,10 +221,11 @@ public class CarbonTableReader {
     } else return ImmutableList.of();
   }
 
-  private void getName(CarbonFile carbonFile){
-  if(!carbonFile.getName().equalsIgnoreCase("_system") && !carbonFile.getName().equalsIgnoreCase(".ds_store")){
-    schemaNames.add(carbonFile.getName());
-  }
+  private void getName(CarbonFile carbonFile) {
+    if (!carbonFile.getName().equalsIgnoreCase("_system") && !carbonFile.getName()
+        .equalsIgnoreCase(".ds_store")) {
+      schemaNames.add(carbonFile.getName());
+    }
   }
 
   /**
@@ -243,11 +247,11 @@ public class CarbonTableReader {
    */
   private Set<String> updateTableList(String schemaName) {
     List<CarbonFile> schema =
-            Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
-                    .collect(Collectors.toList());
+        Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
+            .collect(Collectors.toList());
     if (schema.size() > 0) {
       return Stream.of((schema.get(0)).listFiles()).map(CarbonFile::getName)
-              .collect(Collectors.toSet());
+          .collect(Collectors.toSet());
     } else return ImmutableSet.of();
   }
 
@@ -294,11 +298,11 @@ public class CarbonTableReader {
 
     if (isKeyExists) {
       CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
-      if(carbonTableCacheModel != null && carbonTableCacheModel.carbonTable.getTableInfo() != null) {
-        Long latestTime = FileFactory.getCarbonFile(
-            CarbonTablePath.getSchemaFilePath(
-                carbonCache.get().get(schemaTableName).carbonTable.getTablePath())
-        ).getLastModifiedTime();
+      if (carbonTableCacheModel != null
+          && carbonTableCacheModel.carbonTable.getTableInfo() != null) {
+        Long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+            .getSchemaFilePath(carbonCache.get().get(schemaTableName).carbonTable.getTablePath()))
+            .getLastModifiedTime();
         Long oldTime = carbonTableCacheModel.carbonTable.getTableInfo().getLastUpdatedTime();
         if (DateUtils.truncate(new Date(latestTime), Calendar.MINUTE)
             .after(DateUtils.truncate(new Date(oldTime), Calendar.MINUTE))) {
@@ -317,7 +321,6 @@ public class CarbonTableReader {
     }
   }
 
-
   /**
    * Find the table with the given name and build a CarbonTable instance for it.
    * This method should be called after this.updateSchemaTables().
@@ -401,19 +404,10 @@ public class CarbonTableReader {
     return result;
   }
 
-  public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
       Expression filters, TupleDomain<ColumnHandle> constraints) throws IOException {
     List<CarbonLocalInputSplit> result = new ArrayList<>();
-    if(config.getUnsafeMemoryInMb() != null) {
-      CarbonProperties.getInstance().addProperty(
-          CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
-          config.getUnsafeMemoryInMb());
-    }
-    if(config.getEnableUnsafeInQueryExecution() != null) {
-      CarbonProperties.getInstance().addProperty(
-          CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION,
-          config.getEnableUnsafeInQueryExecution());
-    }
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
     CarbonTable carbonTable = tableCacheModel.carbonTable;
     TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
     Configuration config = new Configuration();
@@ -422,56 +416,73 @@ public class CarbonTableReader {
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
     config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
     config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
 
     JobConf jobConf = new JobConf(config);
     List<PartitionSpec> filteredPartitions = new ArrayList();
 
     PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
 
-    if(partitionInfo!=null && partitionInfo.getPartitionType()== PartitionType.NATIVE_HIVE) {
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
       try {
-        loadMetadataDetails= SegmentStatusManager
-            .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
       } catch (IOException exception) {
         LOGGER.error(exception.getMessage());
         throw exception;
       }
-      filteredPartitions = findRequiredPartitions(constraints, carbonTable,loadMetadataDetails);
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
     }
     try {
       CarbonTableInputFormat.setTableInfo(config, tableInfo);
       CarbonTableInputFormat carbonTableInputFormat =
-          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(), filters,filteredPartitions);
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(), filters,
+              filteredPartitions);
       Job job = Job.getInstance(jobConf);
       List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
-      CarbonInputSplit carbonInputSplit = null;
       Gson gson = new Gson();
       if (splits != null && splits.size() > 0) {
         for (InputSplit inputSplit : splits) {
-          carbonInputSplit = (CarbonInputSplit) inputSplit;
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
           result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
               carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
               carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
               carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
-              carbonInputSplit.getDeleteDeltaFiles(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
               gson.toJson(carbonInputSplit.getDetailInfo())));
         }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> carbonInput.getSegmentId().concat(carbonInput.getPath()))).values());
+        if (inputSplits != null) {
+          for (int j = 0; j < inputSplits.size(); j++) {
+            multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
+                inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
+                    .toArray(String[]::new)));
+          }
+        }
+        LOGGER.error("Size fo MultiblockList   " + multiBlockSplitList.size());
+
       }
 
     } catch (IOException e) {
       throw new RuntimeException("Error creating Splits from CarbonTableInputFormat", e);
     }
 
-    return result;
+    return multiBlockSplitList;
   }
 
-  /** Returns list of partition specs to query based on the domain constraints
+  /**
+   * Returns list of partition specs to query based on the domain constraints
+   *
    * @param constraints
    * @param carbonTable
    * @throws IOException
    */
-  private List<PartitionSpec> findRequiredPartitions(TupleDomain<ColumnHandle> constraints, CarbonTable carbonTable,
-      LoadMetadataDetails[]loadMetadataDetails) throws IOException {
+  private List<PartitionSpec> findRequiredPartitions(TupleDomain<ColumnHandle> constraints,
+      CarbonTable carbonTable, LoadMetadataDetails[] loadMetadataDetails) throws IOException {
     Set<PartitionSpec> partitionSpecs = new HashSet<>();
     List<PartitionSpec> prunePartitions = new ArrayList();
 
@@ -490,29 +501,40 @@ public class CarbonTableReader {
     List<String> partitionValuesFromExpression =
         PrestoFilterUtil.getPartitionFilters(carbonTable, constraints);
 
-    List<PartitionSpec> partitionSpecList = partitionSpecs.stream().filter( partitionSpec ->
-        CollectionUtils.isSubCollection(partitionValuesFromExpression, partitionSpec.getPartitions())).collect(Collectors.toList());
+    List<PartitionSpec> partitionSpecList = partitionSpecs.stream().filter(
+        partitionSpec -> CollectionUtils
+            .isSubCollection(partitionValuesFromExpression, partitionSpec.getPartitions()))
+        .collect(Collectors.toList());
 
     prunePartitions.addAll(partitionSpecList);
 
     return prunePartitions;
   }
 
-  private CarbonTableInputFormat<Object>  createInputFormat( Configuration conf,
-      AbsoluteTableIdentifier identifier, Expression filterExpression, List<PartitionSpec> filteredPartitions)
-      throws IOException {
+  private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
+      AbsoluteTableIdentifier identifier, Expression filterExpression,
+      List<PartitionSpec> filteredPartitions) throws IOException {
     CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
-    CarbonTableInputFormat.setTablePath(conf,
-        identifier.appendWithLocalPrefix(identifier.getTablePath()));
+    CarbonTableInputFormat
+        .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
     CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
-    if(filteredPartitions.size() != 0) {
+    if (filteredPartitions.size() != 0) {
       CarbonTableInputFormat.setPartitionsToPrune(conf, filteredPartitions);
     }
     return format;
   }
 
+  private void populateCarbonProperties() {
+    addProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, config.getUnsafeMemoryInMb());
+    addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION,
+        config.getEnableUnsafeInQueryExecution());
+    addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+        config.getEnableUnsafeColumnPage());
+    addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, config.getEnableUnsafeSort());
+    addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, config.getEnableQueryStatistics());
+  }
   private void setS3Properties(){
-  FileFactory.getConfiguration()
+    FileFactory.getConfiguration()
       .set(ACCESS_KEY, Objects.toString(config.getS3A_AcesssKey(),""));
     FileFactory.getConfiguration()
         .set(SECRET_KEY, Objects.toString(config.getS3A_SecretKey()));
@@ -528,4 +550,25 @@ public class CarbonTableReader {
       Objects.toString(config.getS3EndPoint(),""));
 }
 
+  private void addProperty(String propertyName, String propertyValue) {
+    if (propertyValue != null) {
+      CarbonProperties.getInstance().addProperty(propertyName, propertyValue);
+    }
+  }
+
+  /**
+   * @param cis
+   * @return
+   */
+  private String[] getLocations(CarbonLocalInputSplit cis) {
+    return cis.getLocations().toArray(new String[cis.getLocations().size()]);
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public void setQueryId(String queryId) {
+    this.queryId = queryId;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index e6ac386..54f2b5f 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -68,32 +68,36 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
    * @return
    * @throws IOException
    */
-  public Block readBlock(Type type)
-      throws IOException
-  {
+  public Block readBlock(Type type) throws IOException {
     int numberOfRows = 0;
     BlockBuilder builder = null;
-    if(isVectorReader) {
+    if (isVectorReader) {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(columnVector.anyNullsSet())
-        {
-          handleNullInVector(type, numberOfRows, builder);
+        if (isDictionary) {
+          if (isShortDecimal(type)) {
+            populateShortDictionaryVector(type, numberOfRows, builder);
+          } else {
+            populateLongDictionaryVector(type, numberOfRows, builder);
+          }
         } else {
-          if(isShortDecimal(type)) {
-            populateShortDecimalVector(type, numberOfRows, builder);
+          if (columnVector.anyNullsSet()) {
+            handleNullInVector(type, numberOfRows, builder);
           } else {
-            populateLongDecimalVector(type, numberOfRows, builder);
+            if (isShortDecimal(type)) {
+              populateShortDecimalVector(type, numberOfRows, builder);
+            } else {
+              populateLongDecimalVector(type, numberOfRows, builder);
+            }
           }
         }
       }
-
     } else {
       if (streamData != null) {
         numberOfRows = streamData.length;
         builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-        for(int i = 0; i < numberOfRows ; i++ ){
+        for (int i = 0; i < numberOfRows; i++) {
           Slice slice = getSlice(streamData[i], type);
           if (isShortDecimal(type)) {
             type.writeLong(builder, parseLong((DecimalType) type, slice, 0, slice.length()));
@@ -211,52 +215,55 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
 
   private void populateShortDecimalVector(Type type, int numberOfRows, BlockBuilder builder) {
     DecimalType decimalType = (DecimalType) type;
+    for (int i = 0; i < numberOfRows; i++) {
+      BigDecimal decimalValue = (BigDecimal) columnVector.getData(i);
+      long rescaledDecimal = Decimals
+          .rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(),
+              decimalType.getScale());
+      type.writeLong(builder, rescaledDecimal);
+    }
+  }
 
-    if (isDictionary) {
-      for (int i = 0; i < numberOfRows; i++) {
-        int value = (int)columnVector.getData(i);
-        Object data = DataTypeUtil
-            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
-        if(Objects.isNull(data)) {
-          builder.appendNull();
-        } else {
-          BigDecimal decimalValue = (BigDecimal) data;
-          long rescaledDecimal =
-              Decimals.rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(),decimalType.getScale());
-          type.writeLong(builder, rescaledDecimal);
-        }
-      }
-    } else {
-      for (int i = 0; i < numberOfRows; i++) {
-        BigDecimal decimalValue = (BigDecimal) columnVector.getData(i);
-        long rescaledDecimal =
-            Decimals.rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(),decimalType.getScale());
+  private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder) {
+    for (int i = 0; i < numberOfRows; i++) {
+      Slice slice = getSlice((columnVector.getData(i)), type);
+      type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
+    }
+  }
+
+  private void populateShortDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
+    DecimalType decimalType = (DecimalType) type;
+    for (int i = 0; i < numberOfRows; i++) {
+      int value = (int) columnVector.getData(i);
+      Object data = DataTypeUtil.getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value),
+          DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
+      if (Objects.isNull(data)) {
+        builder.appendNull();
+      } else {
+        BigDecimal decimalValue = (BigDecimal) data;
+        long rescaledDecimal = Decimals
+            .rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(),
+                decimalType.getScale());
         type.writeLong(builder, rescaledDecimal);
       }
     }
   }
 
-  private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder) {
-    if (isDictionary) {
-      for (int i = 0; i < numberOfRows; i++) {
-        int value = (int) columnVector.getData(i);
-        DecimalType decimalType = (DecimalType) type;
-        Object data = DataTypeUtil
-            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
-        if(Objects.isNull(data)) {
-          builder.appendNull();
-        } else {
-          BigDecimal decimalValue = (BigDecimal) data;
-          Slice slice = getSlice(decimalValue, type);
-          type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
-        }
-      }
-    } else {
-      for (int i = 0; i < numberOfRows; i++) {
-        Slice slice = getSlice((columnVector.getData(i)), type);
+  private void populateLongDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
+    DecimalType decimalType = (DecimalType) type;
+    for (int i = 0; i < numberOfRows; i++) {
+      int value = (int) columnVector.getData(i);
+      Object data = DataTypeUtil.getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value),
+          DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
+      if (Objects.isNull(data)) {
+        builder.appendNull();
+      } else {
+        BigDecimal decimalValue = (BigDecimal) data;
+        Slice slice = getSlice(decimalValue, type);
         type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
       }
     }
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/01b48fc3/integration/presto/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/resources/log4j.properties b/integration/presto/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e369916
--- /dev/null
+++ b/integration/presto/src/main/resources/log4j.properties
@@ -0,0 +1,11 @@
+# Root logger option
+log4j.rootLogger=INFO,stdout
+
+
+# Redirect log messages to console
+log4j.appender.debug=org.apache.log4j.RollingFileAppender
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+