You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/06/07 17:00:51 UTC

[carbondata] branch master updated: [CARBONDATA-4143] Enable UT with index server and fix related issues

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d838e3b  [CARBONDATA-4143] Enable UT with index server and fix related issues
d838e3b is described below

commit d838e3b25bd4bd498a34ba26249edabde93eafd4
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Thu Feb 25 15:27:30 2021 +0530

    [CARBONDATA-4143] Enable UT with index server and fix related issues
    
    Why is this PR needed?
    enable to run UT with index server.
    Fix below issues:
    1. With index server enabled, select query gives incorrect result with
       SI when parent and child table segments are not in sync.
    2. When reindex is triggered, if stale files are present in the segment
       directory the segment file is being written with incorrect file names.
       (both valid index and stale mergeindex file names). As a result, duplicate
       data is present in SI table but there are no error/incorrect query results.
    
    What changes were proposed in this PR?
    usage of flag useIndexServer. excluded some of the test cases to not run with index server.
    1. While pruning from index server, missingSISegments values were not
       getting considered. Have passed down and set those values to filter.
    2. Before loading data to SI segment, added changes to delete the segment
       directory if already present.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4098
---
 .../carbondata/core/index/IndexInputFormat.java    | 25 ++++++++++++++
 .../apache/carbondata/core/index/IndexUtil.java    |  7 ++--
 .../carbondata/hadoop/api/CarbonInputFormat.java   | 14 ++++----
 .../secondaryindex/TestCacheOperationsForSI.scala  |  3 +-
 .../carbondata/indexserver/IndexServer.scala       |  8 ++++-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  2 ++
 .../apache/spark/sql/hive/DistributionUtil.scala   |  7 +++-
 .../apache/spark/sql/index/CarbonIndexUtil.scala   | 19 +++++++----
 .../spark/sql/test/SparkTestQueryExecutor.scala    | 10 ++++++
 .../spark/sql/test/util/CarbonFunSuite.scala       | 16 +++++++++
 .../org/apache/carbondata/geo/GeoQueryTest.scala   |  9 +++--
 .../scala/org/apache/carbondata/geo/GeoTest.scala  |  3 +-
 .../index/bloom/BloomCoarseGrainIndexSuite.scala   |  8 +++--
 .../index/lucene/LuceneFineGrainIndexSuite.scala   |  4 ++-
 .../allqueries/TestPruneUsingSegmentMinMax.scala   | 15 ++++++---
 .../alterTable/TestAlterTableAddColumns.scala      |  3 +-
 .../TestAlterTableSortColumnsProperty.scala        |  4 ++-
 .../createTable/TestRenameTableWithIndex.scala     |  4 ++-
 .../spark/testsuite/index/CGIndexTestCase.scala    |  4 ++-
 .../spark/testsuite/index/FGIndexTestCase.scala    |  4 ++-
 .../sql/commands/TestCarbonShowCacheCommand.scala  | 39 ++++++++++++++++++++--
 21 files changed, 172 insertions(+), 36 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
index 1069e85..263fde9 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -98,6 +100,8 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
 
   private boolean isSIPruningEnabled;
 
+  private Set<String> missingSISegments;
+
   IndexInputFormat() {
 
   }
@@ -158,6 +162,7 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
               .getIndex(table, distributable.getDistributable().getIndexSchema());
           IndexFilter filter = new IndexFilter(filterResolverIntf);
           filter.setTable(table);
+          filter.setMissingSISegments(missingSISegments);
           if (filterResolverIntf != null) {
             filter.setExpression(filterResolverIntf.getFilterExpression());
           }
@@ -261,6 +266,15 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
     out.writeBoolean(isCountStarJob);
     out.writeBoolean(isAsyncCall);
     out.writeBoolean(isSIPruningEnabled);
+    if (missingSISegments == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeInt(missingSISegments.size());
+      for (String segment : missingSISegments) {
+        out.writeUTF(segment);
+      }
+    }
   }
 
   @Override
@@ -309,6 +323,13 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
     this.isCountStarJob = in.readBoolean();
     this.isAsyncCall = in.readBoolean();
     this.isSIPruningEnabled = in.readBoolean();
+    if (in.readBoolean()) {
+      int numMissingSISegments = in.readInt();
+      missingSISegments = new HashSet<>(numMissingSISegments);
+      for (int i = 0; i < numMissingSISegments; i++) {
+        missingSISegments.add(in.readUTF());
+      }
+    }
   }
 
   private void initReadCommittedScope() throws IOException {
@@ -459,4 +480,8 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
   public ReadCommittedScope getReadCommittedScope() {
     return readCommittedScope;
   }
+
+  public void setMissingSISegments(Set<String> missingSISegments) {
+    this.missingSISegments = missingSISegments;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
index e1b090d..69257f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
@@ -302,7 +302,7 @@ public class IndexUtil {
       List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level,
       List<String> segmentsToBeRefreshed, Configuration configuration) {
     return executeIndexJob(carbonTable, resolver, indexJob, partitionsToPrune, validSegments,
-        invalidSegments, level, false, segmentsToBeRefreshed, false, false, configuration);
+        invalidSegments, level, false, segmentsToBeRefreshed, false, false, configuration, null);
   }
 
   /**
@@ -314,7 +314,7 @@ public class IndexUtil {
       FilterResolverIntf resolver, IndexJob indexJob, List<PartitionSpec> partitionsToPrune,
       List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level,
       Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob,
-      boolean isSIPruningEnabled, Configuration configuration) {
+      boolean isSIPruningEnabled, Configuration configuration, Set<String> missingSISegments) {
     List<String> invalidSegmentNo = new ArrayList<>();
     for (Segment segment : invalidSegments) {
       invalidSegmentNo.add(segment.getSegmentNo());
@@ -323,6 +323,9 @@ public class IndexUtil {
     IndexInputFormat indexInputFormat =
         new IndexInputFormat(carbonTable, resolver, validSegments, invalidSegmentNo,
             partitionsToPrune, false, level, isFallbackJob, false);
+    if (missingSISegments != null) {
+      indexInputFormat.setMissingSISegments(missingSISegments);
+    }
     if (isCountJob) {
       indexInputFormat.setCountStarJob();
       indexInputFormat.setIsWriteToFile(false);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 8878029..1998c18 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -442,13 +442,14 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       List<Segment> invalidSegments, List<String> segmentsToBeRefreshed,
       Configuration configuration) {
     return getDistributedSplit(table, null, partitionNames, validSegments, invalidSegments,
-        segmentsToBeRefreshed, true, configuration);
+        segmentsToBeRefreshed, true, configuration, null);
   }
 
-  private List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
+  public List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
       FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
       List<Segment> validSegments, List<Segment> invalidSegments,
-      List<String> segmentsToBeRefreshed, boolean isCountJob, Configuration configuration) {
+      List<String> segmentsToBeRefreshed, boolean isCountJob, Configuration configuration,
+      Set<String> missingSISegments) {
     boolean isSIPruningEnabled = isSecondaryIndexPruningEnabled(configuration);
     try {
       IndexJob indexJob = (IndexJob) IndexUtil.createIndexJob(IndexUtil.DISTRIBUTED_JOB_NAME);
@@ -458,7 +459,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       return IndexUtil
           .executeIndexJob(table, filterResolverIntf, indexJob, partitionNames, validSegments,
               invalidSegments, null, false, segmentsToBeRefreshed, isCountJob, isSIPruningEnabled,
-              configuration);
+              configuration, missingSISegments);
     } catch (Exception e) {
       // Check if fallback is disabled for testing purposes then directly throw exception.
       if (CarbonProperties.getInstance().isFallBackDisabled()) {
@@ -469,7 +470,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       return IndexUtil
           .executeIndexJob(table, filterResolverIntf, IndexUtil.getEmbeddedJob(), partitionNames,
               validSegments, invalidSegments, null, true, segmentsToBeRefreshed, isCountJob,
-              isSIPruningEnabled, configuration);
+              isSIPruningEnabled, configuration, missingSISegments);
     }
   }
 
@@ -561,7 +562,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       try {
         prunedBlocklets =
             getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, validSegments,
-                invalidSegments, segmentsToBeRefreshed, false, job.getConfiguration());
+                invalidSegments, segmentsToBeRefreshed, false, job.getConfiguration(),
+                filter.getMissingSISegments());
       } catch (Exception e) {
         // Check if fallback is disabled then directly throw exception otherwise try driver
         // pruning.
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCacheOperationsForSI.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCacheOperationsForSI.scala
index c380811..468414c 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCacheOperationsForSI.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCacheOperationsForSI.scala
@@ -86,7 +86,8 @@ class TestCacheOperationsForSI extends QueryTest with BeforeAndAfterAll {
     sql(s"DROP TABLE $tableName")
   }
 
-  test("Test SI for Show Cache") {
+  // Exclude when running with index server, as show cache rows count varies.
+  test("Test SI for Show Cache", true) {
     val tableName = "t2"
     val indexName = "index1"
 
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 6db718a..1eee0b0 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.indexserver
 
+import java.io.IOException
 import java.net.InetSocketAddress
 import java.security.PrivilegedAction
 import java.util.UUID
@@ -39,7 +40,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.index.{IndexInputFormat, IndexStoreManager}
 import org.apache.carbondata.core.indexstore.{ExtendedBlockletWrapperContainer, SegmentWrapperContainer}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.events.{IndexServerEvent, OperationContext, OperationListenerBus}
 
 @ProtocolInfo(protocolName = "org.apache.carbondata.indexserver.ServerInterface",
@@ -50,6 +51,7 @@ trait ServerInterface {
   /**
    * Used to prune and cache the index for the table.
    */
+  @throws(classOf[IOException])
   def getSplits(request: IndexInputFormat): ExtendedBlockletWrapperContainer
 
   /**
@@ -111,6 +113,9 @@ object IndexServer extends ServerInterface {
    */
   private def doAs[T](f: => T): T = {
     UserGroupInformation.getLoginUser.doAs(new PrivilegedAction[T] {
+      if (System.getProperty("useIndexServer") != null) {
+        ThreadLocalSessionInfo.getCarbonSessionInfo.getSessionParams.getAddedProps.clear()
+      }
       override def run(): T = {
         f
       }
@@ -161,6 +166,7 @@ object IndexServer extends ServerInterface {
     }
   }
 
+  @throws(classOf[IOException])
   def getSplits(request: IndexInputFormat): ExtendedBlockletWrapperContainer = {
     doAs {
       val sparkSession = SparkSQLUtil.getSparkSession
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index c4b8924..83d1890 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -145,6 +145,8 @@ object DataLoadProcessBuilderOnSpark {
       configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
     if (numPartitions <= 0) {
       numPartitions = convertRDD.partitions.length
+    } else if (System.getProperty("useIndexServer") != null) {
+      convertRDD.partitions
     }
 
     // Because if the number of partitions greater than 1, there will be action operator(sample) in
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index c37d44c..2a1bed5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -91,10 +91,15 @@ object DistributionUtil {
 
   def getExecutors(sparkContext: SparkContext): Map[String, Seq[String]] = {
     val bm = sparkContext.env.blockManager
-    bm.master.getPeers(bm.blockManagerId)
+    val executorMap = bm.master.getPeers(bm.blockManagerId)
       .groupBy(blockManagerId => blockManagerId.host).map {
       case (host, blockManagerIds) => (host, blockManagerIds.map(_.executorId))
     }
+    if (executorMap.isEmpty && bm.blockManagerId.executorId.equalsIgnoreCase("driver")) {
+      Map("localhost" -> Seq("1"))
+    } else {
+      executorMap
+    }
   }
 
   private def getLocalhostIPs = {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 5ca1e6b..a78a6fa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -620,10 +620,10 @@ object CarbonIndexUtil {
 
           try {
             if (!failedLoadMetadataDetails.isEmpty) {
-              // in the case when in SI table a segment is deleted and it's entry is
-              // deleted from the tablestatus file, the corresponding .segment file from
+              // in the case when in SI table it's entry is deleted from the tablestatus file,
+              // the corresponding segment folder and .segment file from
               // the metadata folder should also be deleted as it contains the
-              // mergefilename which does not exist anymore as the segment is deleted.
+              // mergefilename which may not exist or is not valid.
               deleteStaleSegmentFileIfPresent(carbonLoadModel,
                 indexTable,
                 failedLoadMetadataDetails)
@@ -689,11 +689,18 @@ object CarbonIndexUtil {
     failedLoadMetaDataDetails.asScala.map(failedLoadMetaData => {
       carbonLoadModel.getLoadMetadataDetails.asScala.map(loadMetaData => {
         if (failedLoadMetaData.getLoadName == loadMetaData.getLoadName) {
-          val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(indexTable.getTablePath) +
+          val segmentMetadataFilePath =
+            CarbonTablePath.getSegmentFilesLocation(indexTable.getTablePath) +
             CarbonCommonConstants.FILE_SEPARATOR + loadMetaData.getSegmentFile
-          if (FileFactory.isFileExist(segmentFilePath)) {
+          val segmentPath = CarbonTablePath.getSegmentPath(indexTable.getTablePath,
+            loadMetaData.getLoadName)
+          if (FileFactory.isFileExist(segmentPath)) {
+            // delete the segment folder if it exists as it may contain stale index and data files.
+            FileFactory.deleteFile(segmentPath)
+          }
+          if (FileFactory.isFileExist(segmentMetadataFilePath)) {
             // delete the file if it exists
-            FileFactory.deleteFile(segmentFilePath)
+            FileFactory.deleteFile(segmentMetadataFilePath)
           }
         }
       })
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
index 371b894..e942404 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.indexserver.IndexServer
 
 /**
  * This class is a sql executor of unit test case for spark version 2.x.
@@ -82,6 +83,15 @@ object SparkTestQueryExecutor {
       copyResourcesifNotExists(hdfsUrl, s"$integrationPath/spark/src/test/resources",
         s"$integrationPath//spark-common-cluster-test/src/test/resources/testdatafileslist.txt")
   }
+  if (System.getProperty("useIndexServer") != null) {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_IP, "localhost")
+      .addProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_PORT, "9998")
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER, "true")
+      .addProperty(CarbonCommonConstants.CARBON_DISABLE_INDEX_SERVER_FALLBACK, "true")
+      .addProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_THREADS_FOR_BLOCK_PRUNING, "1")
+    IndexServer.main(Array())
+  }
   FileFactory.getConfiguration.
     set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
   spark.sparkContext.setLogLevel("ERROR")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/CarbonFunSuite.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/CarbonFunSuite.scala
index 55d652e..1f0af93 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/CarbonFunSuite.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/CarbonFunSuite.scala
@@ -44,4 +44,20 @@ private[spark] abstract class CarbonFunSuite extends FunSuite {
     }
   }
 
+  protected def test(testName : scala.Predef.String, ignoreForIndexServer: Boolean,
+                     testTags : org.scalatest.Tag*)
+                    (testFun : => Unit): Unit = {
+    if (ignoreForIndexServer) {
+      // To skip testcase when useIndexServer property is set.
+      if (System.getProperty("useIndexServer") == null) {
+        super.test(testName)(testFun)
+      }
+    } else {
+      // To run the testcase only when useIndexServer property is set.
+      if (System.getProperty("useIndexServer") != null) {
+        super.test(testName)(testFun)
+      }
+    }
+  }
+
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala
index d56ed73..3cb232a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala
@@ -139,7 +139,9 @@ class GeoQueryTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterE
     checkAnswer(df, Seq(Row(64, 79, "1"), Row(39, 37, "2")))
   }
 
-  test("test block pruning with polygon join query") {
+  // Exclude when running with index server, as pruning info for explain command
+  // not set with index server.
+  test("test block pruning with polygon join query", true) {
     createTable()
     sql(s"insert into $geoTable select 855280799612,1,2,116285807,40084087")
     sql(s"insert into $geoTable select 855283635086,1,2,116372142,40129503")
@@ -223,7 +225,10 @@ class GeoQueryTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterE
     ).getMessage.contains("Join condition having left column polygon is not GeoId column")
   }
 
-  test("test block pruning on spatial and polygon table with in_polygon_join_range_list udf") {
+  // Exclude when running with index server, as pruning info for explain command
+  // not set with index server.
+  test("test block pruning on spatial and polygon table with in_polygon_join_range_list udf",
+    true) {
    createTable()
     sql(s"insert into $geoTable select 855280799612,1,2,116285807,40084087")
     sql(s"insert into $geoTable select 855283635086,1,2,116372142,40129503")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index b911c26..c7b2e5a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -365,7 +365,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
       result)
   }
 
-  test("test block pruning for polygon query") {
+  // Exclude when running with index server as it uses UNKNOWN expression to prune.
+  test("test block pruning for polygon query", true) {
     createTable()
     sql(s"insert into $table1 select 855280799612,1575428400000,116285807,40084087")
     sql(s"insert into $table1 select 855283635086,1575428400000,116372142,40129503")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
index 0c83281..749ac48 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
@@ -392,8 +392,10 @@ class BloomCoarseGrainIndexSuite extends QueryTest with BeforeAndAfterAll with B
     sql(s"DROP TABLE IF EXISTS $bloomSampleTable")
   }
 
+  // Exclude when running with index server, as pruning info for explain command
+  // not set with index server.
   test("test bloom index: " +
-       "multiple indexes with each on one column vs one index on multiple columns") {
+       "multiple indexes with each on one column vs one index on multiple columns", true) {
     val iterations = 1
     // 500000 lines will result to 3 blocklets and bloomfilter index will prune 2 blocklets.
     val index11 = "index11"
@@ -642,7 +644,9 @@ class BloomCoarseGrainIndexSuite extends QueryTest with BeforeAndAfterAll with B
       "BloomFilter does not support binary datatype column: cust_id"  ))
   }
 
-  test("test create bloom index on newly added column") {
+  // Exclude when running with index server, as pruning info for explain command
+  // not set with index server.
+  test("test create bloom index on newly added column", true) {
     // Fix the loading cores to ensure number of buckets.
     CarbonProperties.getInstance().addProperty("carbon.number.of.cores.while.loading", "1")
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala
index 28ff9e7..8026984 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala
@@ -550,7 +550,9 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index dm on table index_test_overwrite")
   }
 
-  test("explain query with lucene index") {
+  // Exclude when running with index server, as pruning info for explain command
+  // not set with index server.
+  test("explain query with lucene index", true) {
     sql("drop table if exists main")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE, "8")
     sql(
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
index 1077368..c3c2f57 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
@@ -36,7 +36,8 @@ class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists parquet")
   }
 
-  test("test if matched segment is only loaded to cache") {
+  // Exclude when running with index server, as show cache result varies.
+  test("test if matched segment is only loaded to cache", true) {
     createTablesAndLoadData
     checkAnswer(sql("select * from carbon where a=1"), sql("select * from parquet where a=1"))
     val showCache = sql("show metacache on table carbon").collect()
@@ -58,7 +59,8 @@ class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll {
     // scalastyle:on lineLength
   }
 
-  test("test if matched segment is only loaded to cache after drop column") {
+  // Exclude when running with index server, as show cache result varies.
+  test("test if matched segment is only loaded to cache after drop column", true) {
     createTablesAndLoadData
     checkAnswer(sql("select * from carbon where a=1"), sql("select * from parquet where a=1"))
     checkAnswer(sql("select * from carbon where a=2"), sql("select * from parquet where a=2"))
@@ -78,7 +80,8 @@ class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll {
     drop
   }
 
-  test("test if matched segment is only loaded to cache after add column") {
+  // Exclude when running with index server, as show cache result varies.
+  test("test if matched segment is only loaded to cache after add column", true) {
     createTablesAndLoadData
     sql("alter table carbon add columns(g decimal(3,2))")
     sql("insert into carbon values" +
@@ -92,7 +95,8 @@ class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll {
     drop
   }
 
-  test("test segment pruning after update operation") {
+  // Exclude when running with index server, as show cache result varies.
+  test("test segment pruning after update operation", true) {
     createTablesAndLoadData
     checkAnswer(sql("select a from carbon where a=1"), Seq(Row(1)))
     var showCache = sql("show metacache on table carbon").collect()
@@ -108,7 +112,8 @@ class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll {
     drop
   }
 
-  test("alter set/unset sort column properties") {
+  // Exclude when running with index server, as show cache result varies.
+  test("alter set/unset sort column properties", true) {
     createTablesAndLoadData
     sql(s"alter table carbon set tblproperties('sort_scope'='local_sort', 'sort_columns'='a')")
     sql("insert into carbon values" +
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index 48d9bed..f248d72 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -277,7 +277,8 @@ class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("Test alter add for arrays enabling local dictionary") {
+  // Exclude when running with index server as the returned rows order may vary
+  test("Test alter add for arrays enabling local dictionary", true) {
     import scala.collection.mutable.WrappedArray.make
     createTableForComplexTypes("LOCAL_DICTIONARY_INCLUDE", "ARRAY")
     // For the previous segments the default value for newly added array column is null
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
index d9cd292..55b4245 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
@@ -693,7 +693,9 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
           "where smallIntField = 2 and charField is not null order by floatField"))
   }
 
-  test("bloom filter") {
+  // Exclude when running with index server, as pruning info for explain command
+  // not set with index server.
+  test("bloom filter", true) {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
     val tableName = "alter_sc_bloom"
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
index df6a855..947a33a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
@@ -38,7 +38,9 @@ class TestRenameTableWithIndex extends QueryTest with BeforeAndAfterAll {
       .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
   }
 
-  test("Creating a bloomfilter, SI indexSchema,then table rename") {
+  // Exclude when running with index server, as pruning info for explain command
+  // not set with index server.
+  test("Creating a bloomfilter indexSchema,then table rename", true) {
     sql(
       s"""
          | CREATE TABLE carbon_table(
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
index 2bf46aa..35ff2d2 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
@@ -418,7 +418,9 @@ class CGIndexTestCase extends QueryTest with BeforeAndAfterAll {
       sql("select * from normal_test where name='n502670' and city='c2670'"))
   }
 
-  test("test invisible index during query") {
+  // Exclude when running with index server, as pruning info for explain command
+  // not set with index server.
+  test("test invisible index during query", true) {
     val tableName = "index_test"
     val indexName1 = "index1"
     val indexName2 = "index2"
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/FGIndexTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/FGIndexTestCase.scala
index e1e64a4..5731c95 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/FGIndexTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/FGIndexTestCase.scala
@@ -508,7 +508,9 @@ class FGIndexTestCase extends QueryTest with BeforeAndAfterAll {
       sql("select * from normal_test where name='n502670' or city='c2670'"))
   }
 
-  test("test invisible indexSchema during query") {
+  // Exclude when running with index server, as pruning info for explain command
+  // not set with index server.
+  test("test invisible indexSchema during query", true) {
     val tableName = "index_test"
     val indexName1 = "index1"
     val indexName2 = "index2"
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
index d9fec75..a282fe0 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
@@ -17,8 +17,12 @@
 
 package org.apache.carbondata.sql.commands
 
+import java.util
+
 import scala.collection.JavaConverters._
 
+import mockit.{Mock, MockUp}
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.test.util.QueryTest
@@ -29,6 +33,11 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.{IndexUtil, Segment}
+import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, PartitionSpec}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.hadoop.api.CarbonInputFormat
 
 class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
   override protected def beforeAll(): Unit = {
@@ -177,7 +186,8 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists carbonTable")
   }
 
-  test("show cache") {
+  // Exclude when running with index server, as show cache rows count varies.
+  test("show cache", true) {
 
     // Empty database
     sql("use cache_empty_db").collect()
@@ -198,7 +208,8 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
     assertResult(0)(indexCacheInfo.length)
   }
 
-  test("show metacache on table") {
+  // Exclude when running with index server, as show cache rows count varies.
+  test("show metacache on table", true) {
     sql("use cache_db").collect()
 
     // Table with Index & Bloom filter
@@ -352,7 +363,8 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists carbonTable")
   }
 
-  test("test cache expiration using expiringMap with bloom") {
+  // Exclude when running with index server, as show cache rows count varies.
+  test("test cache expiration using expiringMap with bloom", true) {
     sql("drop table if exists carbonTable")
     sql("create table carbonTable(col1 int, col2 string,col3 string) stored as carbondata " +
         "tblproperties('index_cache_expiration_seconds'='1')")
@@ -430,4 +442,25 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists carbonTable2")
   }
 
+  // Runs only when index server is enabled.
+  test("test embedded pruning", false) {
+    val mock: MockUp[CarbonInputFormat[Object]] = new MockUp[CarbonInputFormat[Object]]() {
+      @Mock
+      def getDistributedSplit(table: CarbonTable, filterResolverIntf: FilterResolverIntf,
+          partitionNames: util.List[PartitionSpec], validSegments: util.List[Segment],
+          invalidSegments: util.List[Segment], segmentsToBeRefreshed: util.List[String],
+          isCountJob: Boolean, configuration: Configuration, missingSISegments: util.Set[String]):
+      util.List[ExtendedBlocklet] = {
+        IndexUtil.executeIndexJob(table, filterResolverIntf, IndexUtil.getEmbeddedJob,
+          partitionNames, validSegments, invalidSegments, null, true,
+          segmentsToBeRefreshed, isCountJob, false, configuration, missingSISegments)
+      }
+    }
+    sql("drop table if exists maintable1")
+    sql("create table maintable1(a string, b int, c string) stored as carbondata")
+    sql("insert into maintable1 select 'k',1,'k'")
+    checkAnswer(sql("select * from maintable1"), Seq(Row("k", 1, "k")))
+    mock.tearDown()
+  }
+
 }