You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/04/27 10:20:10 UTC
carbondata git commit: [CARBONDATA-2377][CarbonSearch] Support
message throttling in search mode
Repository: carbondata
Updated Branches:
refs/heads/master 242c08be4 -> 2f85381f8
[CARBONDATA-2377][CarbonSearch] Support message throttling in search mode
This closes #2205
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f85381f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f85381f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f85381f
Branch: refs/heads/master
Commit: 2f85381f8bf945fde219c0d24e9649bc015c2ecc
Parents: 242c08b
Author: Jacky Li <ja...@qq.com>
Authored: Sun Apr 22 12:35:10 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Fri Apr 27 18:19:13 2018 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 21 ++-
.../impl/SearchModeDetailQueryExecutor.java | 12 +-
.../SearchModeVectorDetailQueryExecutor.java | 7 +-
.../carbondata/core/scan/model/QueryModel.java | 8 +
.../carbondata/core/util/CarbonProperties.java | 51 ++++++
.../detailquery/SearchModeTestCase.scala | 1 +
.../carbondata/store/SparkCarbonStore.scala | 34 ++--
.../org/apache/spark/sql/CarbonSession.scala | 2 +
store/search/pom.xml | 34 ++++
.../store/worker/SearchRequestHandler.java | 24 ++-
.../scala/org/apache/spark/rpc/Master.scala | 121 +++++++--------
.../scala/org/apache/spark/rpc/Scheduler.scala | 139 +++++++++++++++++
.../scala/org/apache/spark/rpc/Worker.scala | 2 +-
.../org/apache/spark/search/Searcher.scala | 2 +-
.../org/apache/spark/rpc/SchedulerSuite.scala | 154 +++++++++++++++++++
15 files changed, 523 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 54db6e8..c4b0507 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1649,20 +1649,18 @@ public final class CarbonCommonConstants {
public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";
/**
- * Thread size of static ExecutorService in each Node when using search mode.
- * Default value is -1, it means that Executors.newCachedThreadPool() will be used to
- * maximize utilization. If thread numbers has to be limited, set it a positive Integer
- * will call Executors.newFixedThreadPool(int nThreads) instead
+ * The size of thread pool used for reading files in Work for search mode. By default,
+ * it is number of cores in Worker
*/
@CarbonProperty
@InterfaceStability.Unstable
public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.scan.thread";
- public static final String CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT = "-1";
-
/**
* In search mode, Master will listen on this port for worker registration
*/
+ @CarbonProperty
+ @InterfaceStability.Unstable
public static final String CARBON_SEARCH_MODE_MASTER_PORT = "carbon.search.master.port";
public static final String CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT = "10020";
@@ -1678,6 +1676,17 @@ public final class CarbonCommonConstants {
public static final String CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT = "10021";
+ /**
+ * If number of search request sent to Worker exceed this limit, Master will reschedule
+ * the request to another worker. In such case, locality will be lost in HDFS scenario, but
+ * it is fine for S3 scenario.
+ *
+ * If user does not set this value, by default it is 10 * number of cores in Worker
+ */
+ @CarbonProperty
+ @InterfaceStability.Unstable
+ public static final String CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT =
+ "carbon.search.worker.workload.limit";
/*
* whether to enable prefetch for rowbatch to enhance row reconstruction during compaction
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
index 484cafd..04669ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
@@ -42,13 +42,14 @@ public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object>
}
private static synchronized void initThreadPool() {
+ int defaultValue = Runtime.getRuntime().availableProcessors();
int nThread;
try {
nThread = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
- CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
+ String.valueOf(defaultValue)));
} catch (NumberFormatException e) {
- nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
+ nThread = defaultValue;
LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
}
if (nThread > 0) {
@@ -58,6 +59,13 @@ public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object>
}
}
+ public static synchronized void shutdownThreadPool() {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService = null;
+ }
+ }
+
@Override
public CarbonIterator<Object> execute(QueryModel queryModel)
throws QueryExecutionException, IOException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
index 02e8dc1..6c9396b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Executors;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.QueryModel;
@@ -46,13 +45,13 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O
}
private static synchronized void initThreadPool() {
+ int defaultValue = Runtime.getRuntime().availableProcessors();
int nThread;
try {
nThread = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
- CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
+ .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD, String.valueOf(defaultValue)));
} catch (NumberFormatException e) {
- nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
+ nThread = defaultValue;
LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
+ "Using the default value " + nThread);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 409bc2a..de11d11 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -369,4 +369,12 @@ public class QueryModel {
public void setRequiredRowId(boolean requiredRowId) {
this.requiredRowId = requiredRowId;
}
+
+ @Override
+ public String toString() {
+ return String.format("scan on table %s.%s, %d projection columns with filter (%s)",
+ table.getDatabaseName(), table.getTableName(),
+ projection.getDimensions().size() + projection.getMeasures().size(),
+ filterExpressionResolverTree.getFilterExpression().toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 82080dc..391096d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -45,6 +45,8 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCK;
@@ -185,11 +187,39 @@ public final class CarbonProperties {
case CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO:
validateSchedulerMinRegisteredRatio();
break;
+ case CARBON_SEARCH_MODE_SCAN_THREAD:
+ validatePositiveInteger(CARBON_SEARCH_MODE_SCAN_THREAD);
+ break;
+ case CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT:
+ validatePositiveInteger(CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT);
+ break;
// TODO : Validation for carbon.lock.type should be handled for addProperty flow
default:
// none
}
}
+
+ /**
+ * Validate the specified property is positive integer value
+ */
+ private void validatePositiveInteger(String propertyName) {
+ String value = getInstance().getProperty(propertyName);
+ try {
+ int intValue = Integer.parseInt(value);
+ if (intValue <= 0) {
+ getInstance().removeProperty(propertyName);
+ LOGGER.warn(String.format("The value \"%s\" configured for key \"%s\" " +
+ "is invalid. Ignoring it", value, propertyName));
+ throw new IllegalArgumentException();
+ }
+ } catch (NumberFormatException e) {
+ getInstance().removeProperty(propertyName);
+ LOGGER.warn(String.format("The value \"%s\" configured for key \"%s\" " +
+ "is invalid. Ignoring it", value, propertyName));
+ throw e;
+ }
+ }
+
/**
* This method validates the loaded properties and loads default
* values in case of wrong values.
@@ -825,6 +855,15 @@ public final class CarbonProperties {
return this;
}
+ /**
+ * Remove the specified key in property
+ */
+ public CarbonProperties removeProperty(String key) {
+ carbonProperties.remove(key);
+ addedProperty.remove(key);
+ return this;
+ }
+
private ColumnarFormatVersion getDefaultFormatVersion() {
return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
}
@@ -1501,4 +1540,16 @@ public final class CarbonProperties {
return Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT);
}
}
+
+ public static int getMaxWorkloadForWorker(int workerCores) {
+ int defaultValue = workerCores * 10;
+ try {
+ return Integer.parseInt(
+ getInstance().getProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT,
+ String.valueOf(defaultValue)));
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 0e0628e..6921c82 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -32,6 +32,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
val numRows = 500 * 1000
override def beforeAll = {
+ sqlContext.sparkContext.setLogLevel("INFO")
sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
sql("DROP TABLE IF EXISTS main")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index 279e7b0..c0d0d09 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.expression.Expression
@@ -43,6 +44,7 @@ import org.apache.carbondata.spark.rdd.CarbonScanRDD
class SparkCarbonStore extends MetaCachedCarbonStore {
private var session: SparkSession = _
private var master: Master = _
+ private final val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
/**
* Initialize SparkCarbonStore
@@ -114,8 +116,17 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
}
def stopSearchMode(): Unit = {
- master.stopAllWorkers()
+ LOG.info("Shutting down all workers...")
+ try {
+ master.stopAllWorkers()
+ LOG.info("All workers are shutted down")
+ } catch {
+ case e: Exception =>
+ LOG.error(s"failed to shutdown worker: ${e.toString}")
+ }
+ LOG.info("Stopping master...")
master.stopService()
+ LOG.info("Master stopped")
master = null
}
@@ -138,14 +149,19 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
// TODO: how to ensure task is sent to every executor?
val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size
val masterIp = InetAddress.getLocalHost.getHostAddress
- session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors).mapPartitions { f =>
- // start worker
- Worker.init(masterIp, CarbonProperties.getSearchMasterPort)
- new Iterator[Int] {
- override def hasNext: Boolean = false
- override def next(): Int = 1
- }
- }.collect()
+ val rows = session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors)
+ .mapPartitions { f =>
+ // start worker
+ Worker.init(masterIp, CarbonProperties.getSearchMasterPort)
+ new Iterator[Int] {
+ override def hasNext: Boolean = false
+
+ override def next(): Int = 1
+ }
+ }.collect()
+ LOG.info(s"Tried to start $numExecutors workers, ${master.getWorkers.size} " +
+ s"workers are started successfully")
+ rows
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 7da231a..81daece 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -181,6 +181,7 @@ class CarbonSession(@transient val sc: SparkContext,
def startSearchMode(): Unit = {
CarbonProperties.enableSearchMode(true)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
if (carbonStore == null) {
carbonStore = new SparkCarbonStore(this)
carbonStore.startSearchMode()
@@ -189,6 +190,7 @@ class CarbonSession(@transient val sc: SparkContext,
def stopSearchMode(): Unit = {
CarbonProperties.enableSearchMode(false)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
if (carbonStore != null) {
try {
carbonStore.stopSearchMode()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/pom.xml
----------------------------------------------------------------------
diff --git a/store/search/pom.xml b/store/search/pom.xml
index 00184ca..9d833f2 100644
--- a/store/search/pom.xml
+++ b/store/search/pom.xml
@@ -34,9 +34,15 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -73,6 +79,34 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <!-- Note config is repeated in surefire config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>CarbonTestSuite.txt</filereports>
+ <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ </argLine>
+ <stderr />
+ <environmentVariables>
+ </environmentVariables>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+ </systemProperties>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 7708d8b..8e31395 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -35,6 +35,8 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.model.QueryModelBuilder;
@@ -61,15 +63,22 @@ public class SearchRequestHandler {
public SearchResult handleSearch(SearchRequest request) {
try {
+ LOG.info(String.format("[SearchId:%d] receive search request", request.searchId()));
List<CarbonRow> rows = handleRequest(request);
+ LOG.info(String.format("[SearchId:%d] sending success response", request.searchId()));
return createSuccessResponse(request, rows);
} catch (IOException | InterruptedException e) {
LOG.error(e);
+ LOG.info(String.format("[SearchId:%d] sending failure response", request.searchId()));
return createFailureResponse(request, e);
}
}
public ShutdownResponse handleShutdown(ShutdownRequest request) {
+ LOG.info("Shutting down worker...");
+ SearchModeDetailQueryExecutor.shutdownThreadPool();
+ SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
+ LOG.info("Worker shutted down");
return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
}
@@ -92,8 +101,11 @@ public class SearchRequestHandler {
long limit = request.limit();
long rowCount = 0;
+ LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
+ request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size()));
+
// If there is FGDataMap, prune the split by applying FGDataMap
- queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
+ queryModel = tryPruneByFGDataMap(request.searchId(), table, queryModel, mbSplit);
// In search mode, reader will read multiple blocks by using a thread pool
CarbonRecordReader<CarbonRow> reader =
@@ -114,6 +126,8 @@ public class SearchRequestHandler {
} finally {
reader.close();
}
+ LOG.info(String.format("[SearchId:%d] scan completed, return %d rows",
+ request.searchId(), rows.size()));
return rows;
}
@@ -121,7 +135,7 @@ public class SearchRequestHandler {
* If there is FGDataMap defined for this table and filter condition in the query,
* prune the splits by the DataMap and set the pruned split into the QueryModel and return
*/
- private QueryModel tryPruneByFGDataMap(
+ private QueryModel tryPruneByFGDataMap(int queryId,
CarbonTable table, QueryModel queryModel, CarbonMultiBlockSplit mbSplit) throws IOException {
DataMapExprWrapper wrapper =
DataMapChooser.get().choose(table, queryModel.getFilterExpressionResolverTree());
@@ -146,6 +160,8 @@ public class SearchRequestHandler {
blockToRead.add(block);
}
}
+ LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d",
+ queryId, blockToRead.size()));
queryModel.setTableBlockInfos(blockToRead);
}
return queryModel;
@@ -167,7 +183,7 @@ public class SearchRequestHandler {
* create a failure response
*/
private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) {
- return new SearchResult(request.queryId(), Status.FAILURE.ordinal(), throwable.getMessage(),
+ return new SearchResult(request.searchId(), Status.FAILURE.ordinal(), throwable.getMessage(),
new Object[0][]);
}
@@ -181,7 +197,7 @@ public class SearchRequestHandler {
while (itor.hasNext()) {
output[i++] = itor.next().getData();
}
- return new SearchResult(request.queryId(), Status.SUCCESS.ordinal(), "", output);
+ return new SearchResult(request.searchId(), Status.SUCCESS.ordinal(), "", output);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index df08ac4..bc44fb6 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -19,10 +19,9 @@ package org.apache.spark.rpc
import java.io.IOException
import java.net.InetAddress
-import java.util.{List => JList, Map => JMap, Objects, Random, Set => JSet, UUID}
+import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
import scala.collection.JavaConverters._
-import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.duration.Duration
@@ -58,12 +57,13 @@ class Master(sparkConf: SparkConf, port: Int) {
private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
// worker host address map to EndpointRef
- private val workers = mutable.Map[String, RpcEndpointRef]()
private val random = new Random
private var rpcEnv: RpcEnv = _
+ private val scheduler: Scheduler = new Scheduler
+
def this(sparkConf: SparkConf) = {
this(sparkConf, CarbonProperties.getSearchMasterPort)
}
@@ -94,15 +94,15 @@ class Master(sparkConf: SparkConf, port: Int) {
}
def stopAllWorkers(): Unit = {
- val futures = workers.mapValues { ref =>
- ref.ask[ShutdownResponse](ShutdownRequest("user"))
+ val futures = scheduler.getAllWorkers.toSeq.map { case (address, schedulable) =>
+ (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user")))
}
- futures.foreach { case (hostname, future) =>
+ futures.foreach { case (address, future) =>
ThreadUtils.awaitResult(future, Duration.apply("10s"))
future.value match {
case Some(result) =>
result match {
- case Success(response) => workers.remove(hostname)
+ case Success(response) => scheduler.removeWorker(address)
case Failure(throwable) => throw new IOException(throwable.getMessage)
}
case None => throw new ExecutionTimeoutException
@@ -115,29 +115,18 @@ class Master(sparkConf: SparkConf, port: Int) {
LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " +
s"with ${request.cores} cores")
val workerId = UUID.randomUUID().toString
- val workerHostAddress = request.hostAddress
+ val workerAddress = request.hostAddress
val workerPort = request.port
LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId")
- val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(
- RpcAddress(workerHostAddress, workerPort), "search-service")
-
- workers.put(workerHostAddress, endPointRef)
- LOG.info(s"worker ${request.hostAddress}:${request.port} added")
+ val endPointRef =
+ rpcEnv.setupEndpointRef(RpcAddress(workerAddress, workerPort), "search-service")
+ scheduler.addWorker(workerAddress,
+ new Schedulable(workerId, workerAddress, workerPort, request.cores, endPointRef))
+ LOG.info(s"worker ${request.hostAddress}:${request.port} registered")
RegisterWorkerResponse(workerId)
}
- private def getEndpoint(workerIP: String) = {
- try {
- workers(workerIP)
- } catch {
- case e: NoSuchElementException =>
- // no local worker available, choose one worker randomly
- val index = new Random().nextInt(workers.size)
- workers.toSeq(index)._2
- }
- }
-
/**
* Execute search by firing RPC call to worker, return the result rows
* @param table table to search
@@ -154,57 +143,65 @@ class Master(sparkConf: SparkConf, port: Int) {
if (globalLimit < 0 || localLimit < 0) {
throw new IllegalArgumentException("limit should be positive")
}
- if (workers.isEmpty) {
- throw new IOException("No worker is available")
- }
val queryId = random.nextInt
+ var rowCount = 0
+ val output = new ArrayBuffer[CarbonRow]
+
+ def onSuccess(result: SearchResult): Unit = {
+ // in case of RPC success, collect all rows in response message
+ if (result.queryId != queryId) {
+ throw new IOException(
+ s"queryId in response does not match request: ${result.queryId} != $queryId")
+ }
+ if (result.status != Status.SUCCESS.ordinal()) {
+ throw new IOException(s"failure in worker: ${ result.message }")
+ }
+
+ val itor = result.rows.iterator
+ while (itor.hasNext && rowCount < globalLimit) {
+ output += new CarbonRow(itor.next())
+ rowCount = rowCount + 1
+ }
+ LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount")
+ }
+ def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }")
+ def onTimedout() = throw new ExecutionTimeoutException()
+
// prune data and get a mapping of worker hostname to list of blocks,
// then add these blocks to the SearchRequest and fire the RPC call
val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
- val futures = nodeBlockMapping.asScala.map { case (hostname, blocks) =>
+ val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
// Build a SearchRequest
val split = new SerializableWritable[CarbonMultiBlockSplit](
- new CarbonMultiBlockSplit(blocks, hostname))
+ new CarbonMultiBlockSplit(blocks, splitAddress))
val request = SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
- // fire RPC to worker asynchronously
- getEndpoint(hostname).ask[SearchResult](request)
+ // Find an Endpoind and send the request to it
+ // This RPC is non-blocking so that we do not need to wait before send to next worker
+ scheduler.sendRequestAsync[SearchResult](splitAddress, request)
}
- // get all results from RPC response and return to caller
- var rowCount = 0
- val output = new ArrayBuffer[CarbonRow]
- // Loop to get the result of each Worker
- futures.foreach { future: Future[SearchResult] =>
+ // loop to get the result of each Worker
+ tuple.foreach { case (worker: Schedulable, future: Future[SearchResult]) =>
// if we have enough data already, we do not need to collect more result
if (rowCount < globalLimit) {
- // wait on worker for 10s
+ // wait for worker for 10s
ThreadUtils.awaitResult(future, Duration.apply("10s"))
- future.value match {
- case Some(response: Try[SearchResult]) =>
- response match {
- case Success(result) =>
- if (result.queryId != queryId) {
- throw new IOException(
- s"queryId in response does not match request: ${ result.queryId } != $queryId")
- }
- if (result.status != Status.SUCCESS.ordinal()) {
- throw new IOException(s"failure in worker: ${ result.message }")
- }
-
- val itor = result.rows.iterator
- while (itor.hasNext && rowCount < globalLimit) {
- output += new CarbonRow(itor.next())
- rowCount = rowCount + 1
- }
-
- case Failure(e) =>
- throw new IOException(s"exception in worker: ${ e.getMessage }")
- }
- case None =>
- throw new ExecutionTimeoutException()
+ LOG.info(s"[SearchId:$queryId] receive search response from worker " +
+ s"${worker.address}:${worker.port}")
+ try {
+ future.value match {
+ case Some(response: Try[SearchResult]) =>
+ response match {
+ case Success(result) => onSuccess(result)
+ case Failure(e) => onFaiure(e)
+ }
+ case None => onTimedout()
+ }
+ } finally {
+ worker.workload.decrementAndGet()
}
}
}
@@ -230,12 +227,12 @@ class Master(sparkConf: SparkConf, port: Int) {
CarbonLoaderUtil.nodeBlockMapping(
distributables.asJava,
-1,
- workers.keySet.toList.asJava,
+ getWorkers.asJava,
CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST)
}
/** return hostname of all workers */
- def getWorkers: JSet[String] = workers.keySet.asJava
+ def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq
}
// Exception if execution timed out in search mode
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
new file mode 100644
index 0000000..26208d0
--- /dev/null
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.io.IOException
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * [[org.apache.spark.rpc.Master]] uses Scheduler to pick a Worker to send request
+ */
+private[rpc] class Scheduler {
+ // mapping of worker IP address to worker instance
+ private val workers = mutable.Map[String, Schedulable]()
+ private val random = new Random()
+
+ private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * Pick a Worker according to the address and workload of the Worker
+ * Invoke the RPC and return Future result
+ */
+ def sendRequestAsync[T: ClassTag](
+ splitAddress: String,
+ request: Any): (Schedulable, Future[T]) = {
+ require(splitAddress != null)
+ if (workers.isEmpty) {
+ throw new IOException("No worker is available")
+ }
+ var worker = pickWorker(splitAddress)
+
+ // check whether worker exceed max workload, if exceeded, pick next worker
+ val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores)
+ var numTry = workers.size
+ do {
+ if (worker.workload.get() >= maxWorkload) {
+ LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...")
+ worker = pickNextWorker(worker)
+ numTry = numTry - 1
+ } else {
+ numTry = -1
+ }
+ } while (numTry > 0)
+ if (numTry == 0) {
+ // tried so many times and still not able to find Worker
+ throw new WorkerTooBusyException(
+ s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload")
+ }
+ LOG.info(s"sending search request to worker ${worker.address}:${worker.port}")
+ val future = worker.ref.ask(request)
+ worker.workload.incrementAndGet()
+ (worker, future)
+ }
+
+ private def pickWorker[T: ClassTag](splitAddress: String) = {
+ try {
+ workers(splitAddress)
+ } catch {
+ case e: NoSuchElementException =>
+ // no local worker available, choose one worker randomly
+ pickRandomWorker()
+ }
+ }
+
+ /** pick a worker randomly */
+ private def pickRandomWorker() = {
+ val index = random.nextInt(workers.size)
+ workers.toSeq(index)._2
+ }
+
+ /** pick the next worker of the input worker in the [[Scheduler.workers]] */
+ private def pickNextWorker(worker: Schedulable) = {
+ val index = workers.zipWithIndex.find { case ((address, w), index) =>
+ w == worker
+ }.get._2
+ if (index == workers.size - 1) {
+ workers.toSeq.head._2
+ } else {
+ workers.toSeq(index + 1)._2
+ }
+ }
+
+ /** A new searcher is trying to register, add it to the map and connect to this searcher */
+ def addWorker(address: String, schedulable: Schedulable): Unit = {
+ require(schedulable != null)
+ require(address.equals(schedulable.address))
+ workers(address) = schedulable
+ }
+
+ def removeWorker(address: String): Unit = {
+ workers.remove(address)
+ }
+
+ def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator
+}
+
+/**
+ * Represent a Worker which [[Scheduler]] can send
+ * Search request on it
+ * @param id Worker ID, a UUID string
+ * @param cores, number of cores in Worker
+ * @param ref RPC endpoint reference
+ * @param workload number of outstanding request sent to Worker
+ */
+private[rpc] class Schedulable(
+ val id: String,
+ val address: String,
+ val port: Int,
+ val cores: Int,
+ val ref: RpcEndpointRef,
+ var workload: AtomicInteger) {
+ def this(id: String, address: String, port: Int, cores: Int, ref: RpcEndpointRef) = {
+ this(id, address, port, cores, ref, new AtomicInteger())
+ }
+}
+
+class WorkerTooBusyException(message: String) extends RuntimeException(message)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
index 39be35f..0f2138a 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.util.CarbonProperties
@InterfaceAudience.Internal
object Worker {
private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- private var hostAddress = InetAddress.getLocalHost.getHostAddress
+ private val hostAddress = InetAddress.getLocalHost.getHostAddress
private var port: Int = _
def init(masterHostAddress: String, masterPort: Int): Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
index 4ed796e..e467fd3 100644
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
@@ -54,7 +54,7 @@ class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint {
// Search request sent from master to worker
case class SearchRequest(
- queryId: Int,
+ searchId: Int,
split: SerializableWritable[CarbonMultiBlockSplit],
tableInfo: TableInfo,
projectColumns: Array[String],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
new file mode 100644
index 0000000..8780dc0
--- /dev/null
+++ b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class SchedulerSuite extends FunSuite with BeforeAndAfterEach {
+
+ var scheduler: Scheduler = _
+ var w1: Schedulable = _
+ var w2: Schedulable = _
+ var w3: Schedulable = _
+
+ override def beforeEach(): Unit = {
+ scheduler = new Scheduler()
+ w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef())
+ w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef())
+ w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef())
+
+ scheduler.addWorker("1.1.1.1", w1)
+ scheduler.addWorker("1.1.1.2", w2)
+ scheduler.addWorker("1.1.1.3", w3)
+ }
+
+ test("test addWorker, removeWorker, getAllWorkers") {
+ assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
+
+ scheduler.removeWorker("1.1.1.2")
+ assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
+
+ val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef())
+ scheduler.addWorker("1.1.1.4", w4)
+ assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet)
+ assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id)
+ }
+
+ test("test normal schedule") {
+ val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+ assertResult(w1.id)(r1.id)
+ val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ assertResult(w2.id)(r2.id)
+ val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ assertResult(w3.id)(r3.id)
+ val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+ assertResult(w1.id)(r4.id)
+ val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ assertResult(w2.id)(r5.id)
+ val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ assertResult(w3.id)(r6.id)
+ }
+
+ test("test worker unavailable") {
+ val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null)
+ assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id))
+ }
+
+ test("test reschedule when target worker is overload") {
+ // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
+ (1 to 40).foreach { i =>
+ val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+ val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ // it must be worker1 since worker3 exceed max workload
+ assertResult(w1.id)(r.id)
+ }
+
+ test("test all workers are overload") {
+ // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
+ (1 to 40).foreach { i =>
+ val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+ val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+
+ val e = intercept[WorkerTooBusyException] {
+ scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+ }
+
+ test("test user configured overload param") {
+ val original = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3")
+
+ (1 to 3).foreach { i =>
+ val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+ val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+
+ val e = intercept[WorkerTooBusyException] {
+ scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+
+ if (original != null) {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original)
+ }
+ }
+
+ test("test invalid property") {
+ intercept[IllegalArgumentException] {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3")
+ }
+ var value = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+ assertResult(null)(value)
+
+ intercept[NumberFormatException] {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s")
+ }
+ value = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+ assertResult(null)(value)
+ }
+}
+
+class DummyRef extends RpcEndpointRef(new SparkConf) {
+ override def address: RpcAddress = null
+
+ override def name: String = ""
+
+ override def send(message: Any): Unit = { }
+
+ override def ask[T](message: Any, timeout: RpcTimeout)
+ (implicit evidence$1: ClassTag[T]): Future[T] = null
+}
\ No newline at end of file