You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/08/23 17:27:47 UTC

[carbondata] branch master updated: [CARBONDATA-3946] Support IndexServer with Presto Engine

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 34b63d6  [CARBONDATA-3946] Support IndexServer with Presto Engine
34b63d6 is described below

commit 34b63d67e54c3ea302e35d6e9f2513d7345b45dd
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Thu Aug 6 17:33:36 2020 +0530

    [CARBONDATA-3946] Support IndexServer with Presto Engine
    
    Why is this PR needed?
    Currently, when indexserver is enabled with presto, query will through NPE
    
    What changes were proposed in this PR?
    Use Config from Job to get index server client
    Get QueryId from Presto and set it to indexFormat
    
    This closes #3885
---
 .../core/constants/CarbonCommonConstants.java      | 10 +++++
 .../carbondata/core/index/AbstractIndexJob.java    |  4 +-
 .../org/apache/carbondata/core/index/IndexJob.java |  5 ++-
 .../apache/carbondata/core/index/IndexUtil.java    | 12 ++---
 docs/prestodb-guide.md                             |  3 ++
 docs/prestosql-guide.md                            | 20 +++++++++
 .../carbondata/hadoop/api/CarbonInputFormat.java   | 36 ++++++++-------
 .../hadoop/api/CarbonTableInputFormat.java         |  4 +-
 .../presto/impl/CarbonLocalInputSplit.java         |  4 +-
 .../apache/carbondata/presto/CarbondataModule.java |  4 ++
 .../carbondata/presto/CarbondataSplitManager.java  |  1 +
 .../carbondata/presto/impl/CarbonTableReader.java  | 11 +++++
 .../apache/carbondata/presto/CarbondataModule.java |  6 +++
 .../carbondata/presto/CarbondataSplitManager.java  |  1 +
 .../carbondata/presto/impl/CarbonTableReader.java  | 11 +++++
 .../apache/carbondata/indexserver/IndexJobs.scala  | 52 +++++++++++++++++-----
 .../carbondata/indexserver/IndexServer.scala       |  8 ++++
 .../jobs/SparkBlockletIndexLoaderJob.scala         |  5 ++-
 .../indexserver/DistributedRDDUtilsTest.scala      |  8 +++-
 19 files changed, 165 insertions(+), 40 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8864963..a271da6 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2472,4 +2472,14 @@ public final class CarbonCommonConstants {
   public static final String STRING_LENGTH_EXCEEDED_MESSAGE =
       "Record %s of column %s exceeded " + MAX_CHARS_PER_COLUMN_DEFAULT +
           " characters. Please consider long string data type.";
+
+  /**
+   * property which defines the presto query
+   */
+  @CarbonProperty public static final String IS_QUERY_FROM_PRESTO = "is_query_from_presto";
+
+  /**
+   * property which defines the presto query default value
+   */
+  public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false";
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java b/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
index fbf023a..44f2df6 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.indexstore.BlockletIndexWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 /**
@@ -36,7 +37,8 @@ public abstract class AbstractIndexJob implements IndexJob {
   }
 
   @Override
-  public List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat) {
+  public List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat,
+      Configuration configuration) {
     return null;
   }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java b/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java
index 608f989..61cfde0 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.indexstore.BlockletIndexWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 /**
@@ -34,8 +35,8 @@ public interface IndexJob extends Serializable {
 
   void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletIndexWrapper> format);
 
-  List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat);
+  List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat, Configuration configuration);
 
-  Long executeCountJob(IndexInputFormat indexInputFormat);
+  Long executeCountJob(IndexInputFormat indexInputFormat, Configuration configuration);
 
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
index 28449c4..56ee810 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
@@ -122,7 +122,7 @@ public class IndexUtil {
         new IndexInputFormat(carbonTable, validAndInvalidSegmentsInfo.getValidSegments(),
             invalidSegment, true, indexToClear);
     try {
-      indexJob.execute(indexInputFormat);
+      indexJob.execute(indexInputFormat, null);
     } catch (Exception e) {
       // Consider a scenario where clear index job is called from drop table
       // and index server crashes, in this no exception should be thrown and
@@ -273,9 +273,10 @@ public class IndexUtil {
   public static List<ExtendedBlocklet> executeIndexJob(CarbonTable carbonTable,
       FilterResolverIntf resolver, IndexJob indexJob, List<PartitionSpec> partitionsToPrune,
       List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level,
-      List<String> segmentsToBeRefreshed) {
+      List<String> segmentsToBeRefreshed, Configuration configuration) {
     return executeIndexJob(carbonTable, resolver, indexJob, partitionsToPrune, validSegments,
-        invalidSegments, level, false, segmentsToBeRefreshed, false);
+        invalidSegments, level, false, segmentsToBeRefreshed, false,
+        configuration);
   }
 
   /**
@@ -286,7 +287,8 @@ public class IndexUtil {
   public static List<ExtendedBlocklet> executeIndexJob(CarbonTable carbonTable,
       FilterResolverIntf resolver, IndexJob indexJob, List<PartitionSpec> partitionsToPrune,
       List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level,
-      Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob) {
+      Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob,
+      Configuration configuration) {
     List<String> invalidSegmentNo = new ArrayList<>();
     for (Segment segment : invalidSegments) {
       invalidSegmentNo.add(segment.getSegmentNo());
@@ -299,7 +301,7 @@ public class IndexUtil {
       indexInputFormat.setCountStarJob();
       indexInputFormat.setIsWriteToFile(false);
     }
-    return indexJob.execute(indexInputFormat);
+    return indexJob.execute(indexInputFormat, configuration);
   }
 
   public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
diff --git a/docs/prestodb-guide.md b/docs/prestodb-guide.md
index 7b2b2a9..0e45108 100644
--- a/docs/prestodb-guide.md
+++ b/docs/prestodb-guide.md
@@ -301,3 +301,6 @@ Presto carbon only supports reading the carbon table which is written by spark c
 During reading, it supports the non-distributed indexes like block index and bloom index.
 It doesn't support Materialized View as it needs query plan to be changed and presto does not allow it.
 Also, Presto carbon supports streaming segment read from streaming table created by spark.
+
+Presto also supports caching block/blocklet indexes in distributed index server. Refer 
+[Presto Setup with CarbonData Distributed IndexServer](./prestosql-guide.md#presto-setup-with-carbondata-distributed-indexserver)
diff --git a/docs/prestosql-guide.md b/docs/prestosql-guide.md
index 11bb385..ff05379 100644
--- a/docs/prestosql-guide.md
+++ b/docs/prestosql-guide.md
@@ -24,6 +24,8 @@ This tutorial provides a quick introduction to using current integration/presto
 
 [Presto Single Node Setup for Carbondata](#presto-single-node-setup-for-carbondata)
 
+[Presto Setup with CarbonData Distributed IndexServer](#presto-setup-with-carbondata-distributed-indexserver)
+
 ## Presto Multinode Cluster Setup for Carbondata
 ### Installing Presto
 
@@ -301,3 +303,21 @@ Presto carbon only supports reading the carbon table which is written by spark c
 During reading, it supports the non-distributed index like block index and bloom index.
 It doesn't support Materialized View as it needs query plan to be changed and presto does not allow it.
 Also, Presto carbon supports streaming segment read from streaming table created by spark.
+
+## Presto Setup with CarbonData Distributed IndexServer
+
+### Dependency jars
+After copying all the jars from ../integration/presto/target/carbondata-presto-X.Y.Z-SNAPSHOT 
+to `plugin/carbondata` directory on all nodes, ensure copying the following jars as well.
+1. Copy ../integration/spark/target/carbondata-spark_X.Y.Z-SNAPSHOT.jar
+2. Copy corresponding Spark dependency jars to the location.
+
+### Configure properties
+Configure IndexServer configurations in carbon.properties file. Refer 
+[Configuring IndexServer](https://github.com/apache/carbondata/blob/master/docs/index-server.md#Configurations) for more info.
+Add  `-Dcarbon.properties.filepath=<path>/carbon.properties` in jvm.config file. 
+
+### Presto with IndexServer
+Start distributed index server. Launch presto CLI and fire SELECT query and check if the corresponding job
+is fired in the index server application.  Users can use spark to view the cache loaded by using
+show metacache command. Refer: [MetaCacheDDL](./ddl-of-carbondata.md#cache)
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 557fbfa..43cbe1f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -391,8 +391,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * table. If the job fails for some reason then an embedded job is fired to
    * get the count.
    */
-  Long getDistributedCount(CarbonTable table,
-      List<PartitionSpec> partitionNames, List<Segment> validSegments) {
+  Long getDistributedCount(CarbonTable table, List<PartitionSpec> partitionNames,
+      List<Segment> validSegments, Configuration configuration) {
     IndexInputFormat indexInputFormat =
         new IndexInputFormat(table, null, validSegments, new ArrayList<>(),
             partitionNames, false, null, false, false);
@@ -402,25 +402,26 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       if (indexJob == null) {
         throw new ExceptionInInitializerError("Unable to create index job");
       }
-      return indexJob.executeCountJob(indexInputFormat);
+      return indexJob.executeCountJob(indexInputFormat, configuration);
     } catch (Exception e) {
       LOG.error("Failed to get count from index server. Initializing fallback", e);
       IndexJob indexJob = IndexUtil.getEmbeddedJob();
-      return indexJob.executeCountJob(indexInputFormat);
+      return indexJob.executeCountJob(indexInputFormat, configuration);
     }
   }
 
   List<ExtendedBlocklet> getDistributedBlockRowCount(CarbonTable table,
       List<PartitionSpec> partitionNames, List<Segment> validSegments,
-      List<Segment> invalidSegments, List<String> segmentsToBeRefreshed) {
+      List<Segment> invalidSegments, List<String> segmentsToBeRefreshed,
+      Configuration configuration) {
     return getDistributedSplit(table, null, partitionNames, validSegments, invalidSegments,
-        segmentsToBeRefreshed, true);
+        segmentsToBeRefreshed, true, configuration);
   }
 
   private List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
       FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
       List<Segment> validSegments, List<Segment> invalidSegments,
-      List<String> segmentsToBeRefreshed, boolean isCountJob) {
+      List<String> segmentsToBeRefreshed, boolean isCountJob, Configuration configuration) {
     try {
       IndexJob indexJob = (IndexJob) IndexUtil.createIndexJob(IndexUtil.DISTRIBUTED_JOB_NAME);
       if (indexJob == null) {
@@ -428,7 +429,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       }
       return IndexUtil
           .executeIndexJob(table, filterResolverIntf, indexJob, partitionNames, validSegments,
-              invalidSegments, null, false, segmentsToBeRefreshed, isCountJob);
+              invalidSegments, null, false, segmentsToBeRefreshed, isCountJob, configuration);
     } catch (Exception e) {
       // Check if fallback is disabled for testing purposes then directly throw exception.
       if (CarbonProperties.getInstance().isFallBackDisabled()) {
@@ -436,9 +437,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       }
       LOG.error("Exception occurred while getting splits using index server. Initiating Fall "
           + "back to embedded mode", e);
-      return IndexUtil.executeIndexJob(table, filterResolverIntf,
-          IndexUtil.getEmbeddedJob(), partitionNames, validSegments,
-          invalidSegments, null, true, segmentsToBeRefreshed, isCountJob);
+      return IndexUtil
+          .executeIndexJob(table, filterResolverIntf, IndexUtil.getEmbeddedJob(), partitionNames,
+              validSegments, invalidSegments, null, true, segmentsToBeRefreshed, isCountJob,
+              configuration);
     }
   }
 
@@ -528,7 +530,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       try {
         prunedBlocklets =
             getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, segmentIds,
-                invalidSegments, segmentsToBeRefreshed, false);
+                invalidSegments, segmentsToBeRefreshed, false, job.getConfiguration());
       } catch (Exception e) {
         // Check if fallback is disabled then directly throw exception otherwise try driver
         // pruning.
@@ -568,7 +570,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
           if (distributedCG && indexJob != null) {
             cgPrunedBlocklets = IndexUtil
                 .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
-                    segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<>());
+                    segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<>(),
+                    job.getConfiguration());
           } else {
             cgPrunedBlocklets = cgIndexExprWrapper.prune(segmentIds, partitionsToPrune);
           }
@@ -602,9 +605,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
           // Prune segments from already pruned blocklets
           IndexUtil.pruneSegments(segmentIds, prunedBlocklets);
           // Prune segments from already pruned blocklets
-          fgPrunedBlocklets = IndexUtil.executeIndexJob(
-              carbonTable, filter.getResolver(), indexJob, partitionsToPrune, segmentIds,
-              invalidSegments, fgIndexExprWrapper.getIndexLevel(), new ArrayList<>());
+          fgPrunedBlocklets = IndexUtil
+              .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
+                  segmentIds, invalidSegments, fgIndexExprWrapper.getIndexLevel(),
+                  new ArrayList<>(), job.getConfiguration());
           // note that the 'fgPrunedBlocklets' has extra index related info compared with
           // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
           prunedBlocklets =
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 2dd52a4..e3aacc0 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -482,7 +482,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         try {
           List<ExtendedBlocklet> extendedBlocklets =
               getDistributedBlockRowCount(table, partitions, filteredSegment,
-                  allSegments.getInvalidSegments(), toBeCleanedSegments);
+                  allSegments.getInvalidSegments(), toBeCleanedSegments, job.getConfiguration());
           for (ExtendedBlocklet blocklet : extendedBlocklets) {
             String filePath = blocklet.getFilePath().replace("\\", "/");
             String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
@@ -538,7 +538,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       if (CarbonProperties.getInstance()
           .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
         totalRowCount =
-            getDistributedCount(table, partitions, filteredSegment);
+            getDistributedCount(table, partitions, filteredSegment, job.getConfiguration());
       } else {
         TableIndex defaultIndex = IndexStoreManager.getInstance().getDefaultIndex(table);
         totalRowCount = defaultIndex.getRowCount(filteredSegment, partitions, defaultIndex);
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index dc50b8e..815775c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -153,7 +153,9 @@ public class CarbonLocalInputSplit {
         ColumnarFormatVersion.valueOf(carbonLocalInputSplit.getVersion()),
         carbonLocalInputSplit.getDeleteDeltaFiles());
     inputSplit.setFormat(carbonLocalInputSplit.getFileFormat());
-    if (FileFormat.COLUMNAR_V3.ordinal() == inputSplit.getFileFormat().ordinal()) {
+    if (FileFormat.COLUMNAR_V3.ordinal() == inputSplit.getFileFormat().ordinal()
+        && null != carbonLocalInputSplit.detailInfo && !carbonLocalInputSplit.detailInfo
+        .equalsIgnoreCase("null")) {
       Gson gson = new Gson();
       BlockletDetailInfo blockletDetailInfo =
           gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class);
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
index 81b2476..56b52b0 100755
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
@@ -167,6 +167,10 @@ public class CarbondataModule extends HiveClientModule {
     binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
 
     configBinder(binder).bindConfig(ParquetFileWriterConfig.class);
+
+    // configure carbon properties
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO, "true");
   }
 
 }
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
index 5bcadd9..fbb387f 100755
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -101,6 +101,7 @@ public class CarbondataSplitManager extends HiveSplitManager {
     HiveTableLayoutHandle layout = (HiveTableLayoutHandle) layoutHandle;
     SchemaTableName schemaTableName = layout.getSchemaTableName();
 
+    carbonTableReader.setPrestoQueryId(session.getQueryId());
     // get table metadata
     SemiTransactionalHiveMetastore metastore =
         metastoreProvider.apply((HiveTransactionHandle) transactionHandle);
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
index a571bef..d4d4e88 100755
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -103,6 +103,11 @@ public class CarbonTableReader {
   private String queryId;
 
   /**
+   * presto cli query id
+   */
+  private String prestoQueryId;
+
+  /**
    * Logger instance
    */
   private static final Logger LOGGER =
@@ -256,6 +261,7 @@ public class CarbonTableReader {
     List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
     CarbonTable carbonTable = tableCacheModel.getCarbonTable();
     TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set("presto.cli.query.id", prestoQueryId);
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
@@ -397,4 +403,9 @@ public class CarbonTableReader {
   public void setQueryId(String queryId) {
     this.queryId = queryId;
   }
+
+  public void setPrestoQueryId(String prestoQueryId) {
+    this.prestoQueryId = prestoQueryId;
+  }
+
 }
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
index b8c3c0b..b7128da 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
@@ -21,6 +21,8 @@ import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 
 import com.google.inject.Binder;
@@ -162,6 +164,10 @@ public class CarbondataModule extends HiveModule {
     binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
 
     configBinder(binder).bindConfig(ParquetFileWriterConfig.class);
+
+    // configure carbon properties
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO, "true");
   }
 
 }
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
index fe1dd9a..76c3fd0 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -110,6 +110,7 @@ public class CarbondataSplitManager extends HiveSplitManager {
     HiveTableHandle hiveTable = (HiveTableHandle) tableHandle;
     SchemaTableName schemaTableName = hiveTable.getSchemaTableName();
 
+    carbonTableReader.setPrestoQueryId(session.getQueryId());
     // get table metadata
     SemiTransactionalHiveMetastore metastore =
         metastoreProvider.apply((HiveTransactionHandle) transactionHandle);
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7364942..de59a9f 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -101,6 +101,11 @@ public class CarbonTableReader {
   private String queryId;
 
   /**
+   * presto cli query id
+   */
+  private String prestoQueryId;
+
+  /**
    * Logger instance
    */
   private static final Logger LOGGER =
@@ -254,6 +259,7 @@ public class CarbonTableReader {
     List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
     CarbonTable carbonTable = tableCacheModel.getCarbonTable();
     TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set("presto.cli.query.id", prestoQueryId);
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
@@ -395,4 +401,9 @@ public class CarbonTableReader {
   public void setQueryId(String queryId) {
     this.queryId = queryId;
   }
+
+  public void setPrestoQueryId(String prestoQueryId) {
+    this.prestoQueryId = prestoQueryId;
+  }
+
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
index 6957a60..a81c202 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
@@ -20,11 +20,13 @@ import java.util
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.log4j.Logger
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.util.SizeEstimator
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.index.{AbstractIndexJob, IndexInputFormat}
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -32,7 +34,7 @@ import org.apache.carbondata.core.scan.expression.BinaryExpression
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.scan.filter.resolver.{FilterResolverIntf, LogicalFilterResolverImpl, RowLevelFilterResolverImpl}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.spark.util.CarbonScalaUtil.logTime
 
 /**
@@ -43,7 +45,8 @@ class DistributedIndexJob extends AbstractIndexJob {
 
   val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  override def execute(indexFormat: IndexInputFormat): util.List[ExtendedBlocklet] = {
+  override def execute(indexFormat: IndexInputFormat,
+      configuration: Configuration): util.List[ExtendedBlocklet] = {
     if (LOGGER.isDebugEnabled) {
       val messageSize = SizeEstimator.estimate(indexFormat)
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
@@ -54,15 +57,32 @@ class DistributedIndexJob extends AbstractIndexJob {
       .info("Temp folder path for Query ID: " + indexFormat.getQueryId + " is " + splitFolderPath)
     val (response, time) = logTime {
       try {
-        val spark = SparkSQLUtil.getSparkSession
-        indexFormat.setTaskGroupId(SparkSQLUtil.getTaskGroupId(spark))
-        indexFormat.setTaskGroupDesc(SparkSQLUtil.getTaskGroupDesc(spark))
+        val isQueryFromPresto = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO,
+            CarbonCommonConstants.IS_QUERY_FROM_PRESTO_DEFAULT)
+          .toBoolean
+        // In case of presto with index server flow, sparkSession will be null
+        if (!isQueryFromPresto) {
+          val spark = SparkSQLUtil.getSparkSession
+          indexFormat.setTaskGroupId(SparkSQLUtil.getTaskGroupId(spark))
+          indexFormat.setTaskGroupDesc(SparkSQLUtil.getTaskGroupDesc(spark))
+        } else {
+          val queryId = configuration.get("presto.cli.query.id")
+          if (null != queryId) {
+            indexFormat.setTaskGroupId(queryId)
+          }
+        }
         var filterInf = indexFormat.getFilterResolverIntf
         val filterProcessor = new FilterExpressionProcessor
         filterInf = removeSparkUnknown(filterInf,
           indexFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor)
         indexFormat.setFilterResolverIntf(filterInf)
-        IndexServer.getClient.getSplits(indexFormat)
+        val client = if (isQueryFromPresto) {
+          IndexServer.getClient(configuration)
+        } else {
+          IndexServer.getClient
+        }
+        client.getSplits(indexFormat)
           .getExtendedBlocklets(indexFormat.getCarbonTable.getTablePath, indexFormat
             .getQueryId, indexFormat.isCountStarJob)
       } finally {
@@ -110,8 +130,18 @@ class DistributedIndexJob extends AbstractIndexJob {
     filterInf
   }
 
-  override def executeCountJob(indexFormat: IndexInputFormat): java.lang.Long = {
-    IndexServer.getClient.getCount(indexFormat).get()
+  override def executeCountJob(indexFormat: IndexInputFormat,
+      configuration: Configuration): java.lang.Long = {
+    val isQueryFromPresto = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO,
+        CarbonCommonConstants.IS_QUERY_FROM_PRESTO_DEFAULT)
+      .toBoolean
+    val client = if (isQueryFromPresto) {
+      IndexServer.getClient(configuration)
+    } else {
+      IndexServer.getClient
+    }
+    client.getCount(indexFormat).get()
   }
 }
 
@@ -121,7 +151,8 @@ class DistributedIndexJob extends AbstractIndexJob {
  */
 class EmbeddedIndexJob extends AbstractIndexJob {
 
-  override def execute(indexFormat: IndexInputFormat): util.List[ExtendedBlocklet] = {
+  override def execute(indexFormat: IndexInputFormat,
+      configuration: Configuration): util.List[ExtendedBlocklet] = {
     val spark = SparkSQLUtil.getSparkSession
     val originalJobDesc = spark.sparkContext.getLocalProperty("spark.job.description")
     indexFormat.setIsWriteToFile(false)
@@ -137,7 +168,8 @@ class EmbeddedIndexJob extends AbstractIndexJob {
     splits
   }
 
-  override def executeCountJob(inputFormat: IndexInputFormat): java.lang.Long = {
+  override def executeCountJob(inputFormat: IndexInputFormat,
+      configuration: Configuration): java.lang.Long = {
     inputFormat.setFallbackJob()
     IndexServer.getCount(inputFormat).get()
   }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 9798e80..0d32007 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -305,6 +305,14 @@ object IndexServer extends ServerInterface {
   def getClient: ServerInterface = {
     val sparkSession = SparkSQLUtil.getSparkSession
     val configuration = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
+    getClient(configuration)
+  }
+
+  /**
+   * @return Return a new Client to communicate with the Index Server.
+   */
+  def getClient(configuration: Configuration): ServerInterface = {
+
     import org.apache.hadoop.ipc.RPC
     RPC.getProtocolProxy(classOf[ServerInterface],
       RPC.getProtocolVersion(classOf[ServerInterface]),
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala
index 7cb1750..b50b3d9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala
@@ -90,7 +90,10 @@ class SparkBlockletIndexLoaderJob extends AbstractIndexJob {
     }
   }
 
-  override def executeCountJob(indexFormat: IndexInputFormat): lang.Long = 0L
+  override def executeCountJob(indexFormat: IndexInputFormat,
+      configuration: Configuration): lang.Long = {
+    0L
+  }
 }
 
 class IndexCacher(
diff --git a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
index 8f4da6a..b36f7ea 100644
--- a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
@@ -20,9 +20,13 @@ package org.apache.indexserver
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
+
 import mockit.{Mock, MockUp}
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
 import org.apache.carbondata.core.index.{IndexInputFormat, Segment}
 import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexInputSplit
@@ -144,7 +148,7 @@ class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
       }
     }
     try{
-      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance)
+      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance, new Configuration())
     } catch {
       case ex: Exception =>
     }
@@ -176,7 +180,7 @@ class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
       }
     }
     try{
-      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance)
+      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance, new Configuration())
     } catch {
       case ex: Exception =>
     }