You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/04/27 10:20:10 UTC

carbondata git commit: [CARBONDATA-2377][CarbonSearch] Support message throttling in search mode

Repository: carbondata
Updated Branches:
  refs/heads/master 242c08be4 -> 2f85381f8


[CARBONDATA-2377][CarbonSearch] Support message throttling in search mode

This closes #2205


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f85381f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f85381f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f85381f

Branch: refs/heads/master
Commit: 2f85381f8bf945fde219c0d24e9649bc015c2ecc
Parents: 242c08b
Author: Jacky Li <ja...@qq.com>
Authored: Sun Apr 22 12:35:10 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Fri Apr 27 18:19:13 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  21 ++-
 .../impl/SearchModeDetailQueryExecutor.java     |  12 +-
 .../SearchModeVectorDetailQueryExecutor.java    |   7 +-
 .../carbondata/core/scan/model/QueryModel.java  |   8 +
 .../carbondata/core/util/CarbonProperties.java  |  51 ++++++
 .../detailquery/SearchModeTestCase.scala        |   1 +
 .../carbondata/store/SparkCarbonStore.scala     |  34 ++--
 .../org/apache/spark/sql/CarbonSession.scala    |   2 +
 store/search/pom.xml                            |  34 ++++
 .../store/worker/SearchRequestHandler.java      |  24 ++-
 .../scala/org/apache/spark/rpc/Master.scala     | 121 +++++++--------
 .../scala/org/apache/spark/rpc/Scheduler.scala  | 139 +++++++++++++++++
 .../scala/org/apache/spark/rpc/Worker.scala     |   2 +-
 .../org/apache/spark/search/Searcher.scala      |   2 +-
 .../org/apache/spark/rpc/SchedulerSuite.scala   | 154 +++++++++++++++++++
 15 files changed, 523 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 54db6e8..c4b0507 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1649,20 +1649,18 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";
 
   /**
-   * Thread size of static ExecutorService in each Node when using search mode.
-   * Default value is -1, it means that Executors.newCachedThreadPool() will be used to
-   * maximize utilization. If thread numbers has to be limited, set it a positive Integer
-   * will call Executors.newFixedThreadPool(int nThreads) instead
+   * The size of thread pool used for reading files in Work for search mode. By default,
+   * it is number of cores in Worker
    */
   @CarbonProperty
   @InterfaceStability.Unstable
   public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.scan.thread";
 
-  public static final String CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT = "-1";
-
   /**
    * In search mode, Master will listen on this port for worker registration
    */
+  @CarbonProperty
+  @InterfaceStability.Unstable
   public static final String CARBON_SEARCH_MODE_MASTER_PORT = "carbon.search.master.port";
 
   public static final String CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT = "10020";
@@ -1678,6 +1676,17 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT = "10021";
 
+  /**
+   * If number of search request sent to Worker exceed this limit, Master will reschedule
+   * the request to another worker. In such case, locality will be lost in HDFS scenario, but
+   * it is fine for S3 scenario.
+   *
+   * If user does not set this value, by default it is 10 * number of cores in Worker
+   */
+  @CarbonProperty
+  @InterfaceStability.Unstable
+  public static final String CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT =
+      "carbon.search.worker.workload.limit";
 
   /*
    * whether to enable prefetch for rowbatch to enhance row reconstruction during compaction

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
index 484cafd..04669ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
@@ -42,13 +42,14 @@ public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object>
   }
 
   private static synchronized void initThreadPool() {
+    int defaultValue = Runtime.getRuntime().availableProcessors();
     int nThread;
     try {
       nThread = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
-              CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
+              String.valueOf(defaultValue)));
     } catch (NumberFormatException e) {
-      nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
+      nThread = defaultValue;
       LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
     }
     if (nThread > 0) {
@@ -58,6 +59,13 @@ public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object>
     }
   }
 
+  public static synchronized void shutdownThreadPool() {
+    if (executorService != null) {
+      executorService.shutdownNow();
+      executorService = null;
+    }
+  }
+
   @Override
   public CarbonIterator<Object> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
index 02e8dc1..6c9396b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Executors;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.QueryModel;
@@ -46,13 +45,13 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O
   }
 
   private static synchronized void initThreadPool() {
+    int defaultValue = Runtime.getRuntime().availableProcessors();
     int nThread;
     try {
       nThread = Integer.parseInt(CarbonProperties.getInstance()
-              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
-                      CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
+              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD, String.valueOf(defaultValue)));
     } catch (NumberFormatException e) {
-      nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
+      nThread = defaultValue;
       LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
           + "Using the default value " + nThread);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 409bc2a..de11d11 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -369,4 +369,12 @@ public class QueryModel {
   public void setRequiredRowId(boolean requiredRowId) {
     this.requiredRowId = requiredRowId;
   }
+
+  @Override
+  public String toString() {
+    return String.format("scan on table %s.%s, %d projection columns with filter (%s)",
+        table.getDatabaseName(), table.getTableName(),
+        projection.getDimensions().size() + projection.getMeasures().size(),
+        filterExpressionResolverTree.getFilterExpression().toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 82080dc..391096d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -45,6 +45,8 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCK;
@@ -185,11 +187,39 @@ public final class CarbonProperties {
       case CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO:
         validateSchedulerMinRegisteredRatio();
         break;
+      case CARBON_SEARCH_MODE_SCAN_THREAD:
+        validatePositiveInteger(CARBON_SEARCH_MODE_SCAN_THREAD);
+        break;
+      case CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT:
+        validatePositiveInteger(CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT);
+        break;
       // TODO : Validation for carbon.lock.type should be handled for addProperty flow
       default:
         // none
     }
   }
+
+  /**
+   * Validate the specified property is positive integer value
+   */
+  private void validatePositiveInteger(String propertyName) {
+    String value = getInstance().getProperty(propertyName);
+    try {
+      int intValue = Integer.parseInt(value);
+      if (intValue <= 0) {
+        getInstance().removeProperty(propertyName);
+        LOGGER.warn(String.format("The value \"%s\" configured for key \"%s\" " +
+            "is invalid. Ignoring it", value, propertyName));
+        throw new IllegalArgumentException();
+      }
+    } catch (NumberFormatException e) {
+      getInstance().removeProperty(propertyName);
+      LOGGER.warn(String.format("The value \"%s\" configured for key \"%s\" " +
+          "is invalid. Ignoring it", value, propertyName));
+      throw e;
+    }
+  }
+
   /**
    * This method validates the loaded properties and loads default
    * values in case of wrong values.
@@ -825,6 +855,15 @@ public final class CarbonProperties {
     return this;
   }
 
+  /**
+   * Remove the specified key in property
+   */
+  public CarbonProperties removeProperty(String key) {
+    carbonProperties.remove(key);
+    addedProperty.remove(key);
+    return this;
+  }
+
   private ColumnarFormatVersion getDefaultFormatVersion() {
     return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
   }
@@ -1501,4 +1540,16 @@ public final class CarbonProperties {
       return Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT);
     }
   }
+
+  public static int getMaxWorkloadForWorker(int workerCores) {
+    int defaultValue = workerCores * 10;
+    try {
+      return Integer.parseInt(
+          getInstance().getProperty(
+              CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT,
+              String.valueOf(defaultValue)));
+    } catch (NumberFormatException e) {
+      return defaultValue;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 0e0628e..6921c82 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -32,6 +32,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
 
   val numRows = 500 * 1000
   override def beforeAll = {
+    sqlContext.sparkContext.setLogLevel("INFO")
     sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
     sql("DROP TABLE IF EXISTS main")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index 279e7b0..c0d0d09 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
@@ -43,6 +44,7 @@ import org.apache.carbondata.spark.rdd.CarbonScanRDD
 class SparkCarbonStore extends MetaCachedCarbonStore {
   private var session: SparkSession = _
   private var master: Master = _
+  private final val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   /**
    * Initialize SparkCarbonStore
@@ -114,8 +116,17 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
   }
 
   def stopSearchMode(): Unit = {
-    master.stopAllWorkers()
+    LOG.info("Shutting down all workers...")
+    try {
+      master.stopAllWorkers()
+      LOG.info("All workers are shutted down")
+    } catch {
+      case e: Exception =>
+        LOG.error(s"failed to shutdown worker: ${e.toString}")
+    }
+    LOG.info("Stopping master...")
     master.stopService()
+    LOG.info("Master stopped")
     master = null
   }
 
@@ -138,14 +149,19 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
     // TODO: how to ensure task is sent to every executor?
     val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size
     val masterIp = InetAddress.getLocalHost.getHostAddress
-    session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors).mapPartitions { f =>
-      // start worker
-      Worker.init(masterIp, CarbonProperties.getSearchMasterPort)
-      new Iterator[Int] {
-        override def hasNext: Boolean = false
-        override def next(): Int = 1
-      }
-    }.collect()
+    val rows = session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors)
+      .mapPartitions { f =>
+        // start worker
+        Worker.init(masterIp, CarbonProperties.getSearchMasterPort)
+        new Iterator[Int] {
+          override def hasNext: Boolean = false
+
+          override def next(): Int = 1
+        }
+      }.collect()
+    LOG.info(s"Tried to start $numExecutors workers, ${master.getWorkers.size} " +
+             s"workers are started successfully")
+    rows
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 7da231a..81daece 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -181,6 +181,7 @@ class CarbonSession(@transient val sc: SparkContext,
 
   def startSearchMode(): Unit = {
     CarbonProperties.enableSearchMode(true)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
     if (carbonStore == null) {
       carbonStore = new SparkCarbonStore(this)
       carbonStore.startSearchMode()
@@ -189,6 +190,7 @@ class CarbonSession(@transient val sc: SparkContext,
 
   def stopSearchMode(): Unit = {
     CarbonProperties.enableSearchMode(false)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
     if (carbonStore != null) {
       try {
         carbonStore.stopSearchMode()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/pom.xml
----------------------------------------------------------------------
diff --git a/store/search/pom.xml b/store/search/pom.xml
index 00184ca..9d833f2 100644
--- a/store/search/pom.xml
+++ b/store/search/pom.xml
@@ -34,9 +34,15 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -73,6 +79,34 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <!-- Note config is repeated in surefire config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <filereports>CarbonTestSuite.txt</filereports>
+          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+          </argLine>
+          <stderr />
+          <environmentVariables>
+          </environmentVariables>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+          </systemProperties>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 7708d8b..8e31395 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -35,6 +35,8 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
@@ -61,15 +63,22 @@ public class SearchRequestHandler {
 
   public SearchResult handleSearch(SearchRequest request) {
     try {
+      LOG.info(String.format("[SearchId:%d] receive search request", request.searchId()));
       List<CarbonRow> rows = handleRequest(request);
+      LOG.info(String.format("[SearchId:%d] sending success response", request.searchId()));
       return createSuccessResponse(request, rows);
     } catch (IOException | InterruptedException e) {
       LOG.error(e);
+      LOG.info(String.format("[SearchId:%d] sending failure response", request.searchId()));
       return createFailureResponse(request, e);
     }
   }
 
   public ShutdownResponse handleShutdown(ShutdownRequest request) {
+    LOG.info("Shutting down worker...");
+    SearchModeDetailQueryExecutor.shutdownThreadPool();
+    SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
+    LOG.info("Worker shutted down");
     return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
   }
 
@@ -92,8 +101,11 @@ public class SearchRequestHandler {
     long limit = request.limit();
     long rowCount = 0;
 
+    LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
+        request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size()));
+
     // If there is FGDataMap, prune the split by applying FGDataMap
-    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
+    queryModel = tryPruneByFGDataMap(request.searchId(), table, queryModel, mbSplit);
 
     // In search mode, reader will read multiple blocks by using a thread pool
     CarbonRecordReader<CarbonRow> reader =
@@ -114,6 +126,8 @@ public class SearchRequestHandler {
     } finally {
       reader.close();
     }
+    LOG.info(String.format("[SearchId:%d] scan completed, return %d rows",
+        request.searchId(), rows.size()));
     return rows;
   }
 
@@ -121,7 +135,7 @@ public class SearchRequestHandler {
    * If there is FGDataMap defined for this table and filter condition in the query,
    * prune the splits by the DataMap and set the pruned split into the QueryModel and return
    */
-  private QueryModel tryPruneByFGDataMap(
+  private QueryModel tryPruneByFGDataMap(int queryId,
       CarbonTable table, QueryModel queryModel, CarbonMultiBlockSplit mbSplit) throws IOException {
     DataMapExprWrapper wrapper =
         DataMapChooser.get().choose(table, queryModel.getFilterExpressionResolverTree());
@@ -146,6 +160,8 @@ public class SearchRequestHandler {
           blockToRead.add(block);
         }
       }
+      LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d",
+          queryId, blockToRead.size()));
       queryModel.setTableBlockInfos(blockToRead);
     }
     return queryModel;
@@ -167,7 +183,7 @@ public class SearchRequestHandler {
    * create a failure response
    */
   private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) {
-    return new SearchResult(request.queryId(), Status.FAILURE.ordinal(), throwable.getMessage(),
+    return new SearchResult(request.searchId(), Status.FAILURE.ordinal(), throwable.getMessage(),
         new Object[0][]);
   }
 
@@ -181,7 +197,7 @@ public class SearchRequestHandler {
     while (itor.hasNext()) {
       output[i++] = itor.next().getData();
     }
-    return new SearchResult(request.queryId(), Status.SUCCESS.ordinal(), "", output);
+    return new SearchResult(request.searchId(), Status.SUCCESS.ordinal(), "", output);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index df08ac4..bc44fb6 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -19,10 +19,9 @@ package org.apache.spark.rpc
 
 import java.io.IOException
 import java.net.InetAddress
-import java.util.{List => JList, Map => JMap, Objects, Random, Set => JSet, UUID}
+import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
@@ -58,12 +57,13 @@ class Master(sparkConf: SparkConf, port: Int) {
   private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // worker host address map to EndpointRef
-  private val workers = mutable.Map[String, RpcEndpointRef]()
 
   private val random = new Random
 
   private var rpcEnv: RpcEnv = _
 
+  private val scheduler: Scheduler = new Scheduler
+
   def this(sparkConf: SparkConf) = {
     this(sparkConf, CarbonProperties.getSearchMasterPort)
   }
@@ -94,15 +94,15 @@ class Master(sparkConf: SparkConf, port: Int) {
   }
 
   def stopAllWorkers(): Unit = {
-    val futures = workers.mapValues { ref =>
-      ref.ask[ShutdownResponse](ShutdownRequest("user"))
+    val futures = scheduler.getAllWorkers.toSeq.map { case (address, schedulable) =>
+      (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user")))
     }
-    futures.foreach { case (hostname, future) =>
+    futures.foreach { case (address, future) =>
       ThreadUtils.awaitResult(future, Duration.apply("10s"))
       future.value match {
         case Some(result) =>
           result match {
-            case Success(response) => workers.remove(hostname)
+            case Success(response) => scheduler.removeWorker(address)
             case Failure(throwable) => throw new IOException(throwable.getMessage)
           }
         case None => throw new ExecutionTimeoutException
@@ -115,29 +115,18 @@ class Master(sparkConf: SparkConf, port: Int) {
     LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " +
              s"with ${request.cores} cores")
     val workerId = UUID.randomUUID().toString
-    val workerHostAddress = request.hostAddress
+    val workerAddress = request.hostAddress
     val workerPort = request.port
     LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId")
 
-    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(
-      RpcAddress(workerHostAddress, workerPort), "search-service")
-
-    workers.put(workerHostAddress, endPointRef)
-    LOG.info(s"worker ${request.hostAddress}:${request.port} added")
+    val endPointRef =
+      rpcEnv.setupEndpointRef(RpcAddress(workerAddress, workerPort), "search-service")
+    scheduler.addWorker(workerAddress,
+      new Schedulable(workerId, workerAddress, workerPort, request.cores, endPointRef))
+    LOG.info(s"worker ${request.hostAddress}:${request.port} registered")
     RegisterWorkerResponse(workerId)
   }
 
-  private def getEndpoint(workerIP: String) = {
-    try {
-      workers(workerIP)
-    } catch {
-      case e: NoSuchElementException =>
-        // no local worker available, choose one worker randomly
-        val index = new Random().nextInt(workers.size)
-        workers.toSeq(index)._2
-    }
-  }
-
   /**
    * Execute search by firing RPC call to worker, return the result rows
    * @param table table to search
@@ -154,57 +143,65 @@ class Master(sparkConf: SparkConf, port: Int) {
     if (globalLimit < 0 || localLimit < 0) {
       throw new IllegalArgumentException("limit should be positive")
     }
-    if (workers.isEmpty) {
-      throw new IOException("No worker is available")
-    }
 
     val queryId = random.nextInt
+    var rowCount = 0
+    val output = new ArrayBuffer[CarbonRow]
+
+    def onSuccess(result: SearchResult): Unit = {
+      // in case of RPC success, collect all rows in response message
+      if (result.queryId != queryId) {
+        throw new IOException(
+          s"queryId in response does not match request: ${result.queryId} != $queryId")
+      }
+      if (result.status != Status.SUCCESS.ordinal()) {
+        throw new IOException(s"failure in worker: ${ result.message }")
+      }
+
+      val itor = result.rows.iterator
+      while (itor.hasNext && rowCount < globalLimit) {
+        output += new CarbonRow(itor.next())
+        rowCount = rowCount + 1
+      }
+      LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount")
+    }
+    def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }")
+    def onTimedout() = throw new ExecutionTimeoutException()
+
     // prune data and get a mapping of worker hostname to list of blocks,
     // then add these blocks to the SearchRequest and fire the RPC call
     val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
-    val futures = nodeBlockMapping.asScala.map { case (hostname, blocks) =>
+    val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
       // Build a SearchRequest
       val split = new SerializableWritable[CarbonMultiBlockSplit](
-        new CarbonMultiBlockSplit(blocks, hostname))
+        new CarbonMultiBlockSplit(blocks, splitAddress))
       val request = SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
 
-      // fire RPC to worker asynchronously
-      getEndpoint(hostname).ask[SearchResult](request)
+      // Find an Endpoind and send the request to it
+      // This RPC is non-blocking so that we do not need to wait before send to next worker
+      scheduler.sendRequestAsync[SearchResult](splitAddress, request)
     }
-    // get all results from RPC response and return to caller
-    var rowCount = 0
-    val output = new ArrayBuffer[CarbonRow]
 
-    // Loop to get the result of each Worker
-    futures.foreach { future: Future[SearchResult] =>
+    // loop to get the result of each Worker
+    tuple.foreach { case (worker: Schedulable, future: Future[SearchResult]) =>
 
       // if we have enough data already, we do not need to collect more result
       if (rowCount < globalLimit) {
-        // wait on worker for 10s
+        // wait for worker for 10s
         ThreadUtils.awaitResult(future, Duration.apply("10s"))
-        future.value match {
-          case Some(response: Try[SearchResult]) =>
-            response match {
-              case Success(result) =>
-                if (result.queryId != queryId) {
-                  throw new IOException(
-                    s"queryId in response does not match request: ${ result.queryId } != $queryId")
-                }
-                if (result.status != Status.SUCCESS.ordinal()) {
-                  throw new IOException(s"failure in worker: ${ result.message }")
-                }
-
-                val itor = result.rows.iterator
-                while (itor.hasNext && rowCount < globalLimit) {
-                  output += new CarbonRow(itor.next())
-                  rowCount = rowCount + 1
-                }
-
-              case Failure(e) =>
-                throw new IOException(s"exception in worker: ${ e.getMessage }")
-            }
-          case None =>
-            throw new ExecutionTimeoutException()
+        LOG.info(s"[SearchId:$queryId] receive search response from worker " +
+                 s"${worker.address}:${worker.port}")
+        try {
+          future.value match {
+            case Some(response: Try[SearchResult]) =>
+              response match {
+                case Success(result) => onSuccess(result)
+                case Failure(e) => onFaiure(e)
+              }
+            case None => onTimedout()
+          }
+        } finally {
+          worker.workload.decrementAndGet()
         }
       }
     }
@@ -230,12 +227,12 @@ class Master(sparkConf: SparkConf, port: Int) {
     CarbonLoaderUtil.nodeBlockMapping(
       distributables.asJava,
       -1,
-      workers.keySet.toList.asJava,
+      getWorkers.asJava,
       CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST)
   }
 
   /** return hostname of all workers */
-  def getWorkers: JSet[String] = workers.keySet.asJava
+  def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq
 }
 
 // Exception if execution timed out in search mode

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
new file mode 100644
index 0000000..26208d0
--- /dev/null
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.io.IOException
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * [[org.apache.spark.rpc.Master]] uses Scheduler to pick a Worker to send request
+ */
+private[rpc] class Scheduler {
+  // mapping of worker IP address to worker instance
+  private val workers = mutable.Map[String, Schedulable]()
+  private val random = new Random()
+
+  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Pick a Worker according to the address and workload of the Worker
+   * Invoke the RPC and return Future result
+   */
+  def sendRequestAsync[T: ClassTag](
+      splitAddress: String,
+      request: Any): (Schedulable, Future[T]) = {
+    require(splitAddress != null)
+    if (workers.isEmpty) {
+      throw new IOException("No worker is available")
+    }
+    var worker = pickWorker(splitAddress)
+
+    // check whether worker exceed max workload, if exceeded, pick next worker
+    val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores)
+    var numTry = workers.size
+    do {
+      if (worker.workload.get() >= maxWorkload) {
+        LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...")
+        worker = pickNextWorker(worker)
+        numTry = numTry - 1
+      } else {
+        numTry = -1
+      }
+    } while (numTry > 0)
+    if (numTry == 0) {
+      // tried so many times and still not able to find Worker
+      throw new WorkerTooBusyException(
+        s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload")
+    }
+    LOG.info(s"sending search request to worker ${worker.address}:${worker.port}")
+    val future = worker.ref.ask(request)
+    worker.workload.incrementAndGet()
+    (worker, future)
+  }
+
+  private def pickWorker[T: ClassTag](splitAddress: String) = {
+    try {
+      workers(splitAddress)
+    } catch {
+      case e: NoSuchElementException =>
+        // no local worker available, choose one worker randomly
+        pickRandomWorker()
+    }
+  }
+
+  /** pick a worker randomly */
+  private def pickRandomWorker() = {
+    val index = random.nextInt(workers.size)
+    workers.toSeq(index)._2
+  }
+
+  /** pick the next worker of the input worker in the [[Scheduler.workers]] */
+  private def pickNextWorker(worker: Schedulable) = {
+    val index = workers.zipWithIndex.find { case ((address, w), index) =>
+      w == worker
+    }.get._2
+    if (index == workers.size - 1) {
+      workers.toSeq.head._2
+    } else {
+      workers.toSeq(index + 1)._2
+    }
+  }
+
+  /** A new searcher is trying to register, add it to the map and connect to this searcher */
+  def addWorker(address: String, schedulable: Schedulable): Unit = {
+    require(schedulable != null)
+    require(address.equals(schedulable.address))
+    workers(address) = schedulable
+  }
+
+  def removeWorker(address: String): Unit = {
+    workers.remove(address)
+  }
+
+  def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator
+}
+
+/**
+ * Represent a Worker which [[Scheduler]] can send
+ * Search request on it
+ * @param id Worker ID, a UUID string
+ * @param cores, number of cores in Worker
+ * @param ref RPC endpoint reference
+ * @param workload number of outstanding request sent to Worker
+ */
+private[rpc] class Schedulable(
+    val id: String,
+    val address: String,
+    val port: Int,
+    val cores: Int,
+    val ref: RpcEndpointRef,
+    var workload: AtomicInteger) {
+  def this(id: String, address: String, port: Int, cores: Int, ref: RpcEndpointRef) = {
+    this(id, address, port, cores, ref, new AtomicInteger())
+  }
+}
+
+class WorkerTooBusyException(message: String) extends RuntimeException(message)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
index 39be35f..0f2138a 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 @InterfaceAudience.Internal
 object Worker {
   private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  private var hostAddress = InetAddress.getLocalHost.getHostAddress
+  private val hostAddress = InetAddress.getLocalHost.getHostAddress
   private var port: Int = _
 
   def init(masterHostAddress: String, masterPort: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
index 4ed796e..e467fd3 100644
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
@@ -54,7 +54,7 @@ class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint {
 
 // Search request sent from master to worker
 case class SearchRequest(
-    queryId: Int,
+    searchId: Int,
     split: SerializableWritable[CarbonMultiBlockSplit],
     tableInfo: TableInfo,
     projectColumns: Array[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
new file mode 100644
index 0000000..8780dc0
--- /dev/null
+++ b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class SchedulerSuite extends FunSuite with BeforeAndAfterEach {
+
+  var scheduler: Scheduler = _
+  var w1: Schedulable = _
+  var w2: Schedulable = _
+  var w3: Schedulable = _
+
+  override def beforeEach(): Unit = {
+    scheduler = new Scheduler()
+    w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef())
+    w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef())
+    w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef())
+
+    scheduler.addWorker("1.1.1.1", w1)
+    scheduler.addWorker("1.1.1.2", w2)
+    scheduler.addWorker("1.1.1.3", w3)
+  }
+
+  test("test addWorker, removeWorker, getAllWorkers") {
+    assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
+
+    scheduler.removeWorker("1.1.1.2")
+    assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
+
+    val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef())
+    scheduler.addWorker("1.1.1.4", w4)
+    assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet)
+    assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id)
+  }
+
+  test("test normal schedule") {
+    val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+    assertResult(w1.id)(r1.id)
+    val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+    assertResult(w2.id)(r2.id)
+    val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    assertResult(w3.id)(r3.id)
+    val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+    assertResult(w1.id)(r4.id)
+    val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+    assertResult(w2.id)(r5.id)
+    val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    assertResult(w3.id)(r6.id)
+  }
+
+  test("test worker unavailable") {
+    val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null)
+    assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id))
+  }
+
+  test("test reschedule when target worker is overload") {
+    // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
+    (1 to 40).foreach { i =>
+      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+    val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    // it must be worker1 since worker3 exceed max workload
+    assertResult(w1.id)(r.id)
+  }
+
+  test("test all workers are overload") {
+    // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
+    (1 to 40).foreach { i =>
+      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+
+    val e = intercept[WorkerTooBusyException] {
+      scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+  }
+
+  test("test user configured overload param") {
+    val original = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3")
+
+    (1 to 3).foreach { i =>
+      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+
+    val e = intercept[WorkerTooBusyException] {
+      scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+
+    if (original != null) {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original)
+    }
+  }
+
+  test("test invalid property") {
+    intercept[IllegalArgumentException] {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3")
+    }
+    var value = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+    assertResult(null)(value)
+
+    intercept[NumberFormatException] {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s")
+    }
+    value = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+    assertResult(null)(value)
+  }
+}
+
+class DummyRef extends RpcEndpointRef(new SparkConf) {
+  override def address: RpcAddress = null
+
+  override def name: String = ""
+
+  override def send(message: Any): Unit = { }
+
+  override def ask[T](message: Any, timeout: RpcTimeout)
+    (implicit evidence$1: ClassTag[T]): Future[T] = null
+}
\ No newline at end of file