You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 17:59:49 UTC
[22/50] [abbrv] carbondata git commit: [HOTFIX] Remove search mode
module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
index 3b5b5ca..4985718 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -294,17 +294,6 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
val expectedAnswer1 = sql(s"select * from $normalTable where id = 1").collect()
val expectedAnswer2 = sql(s"select * from $normalTable where city in ('city_999')").collect()
- carbonSession.startSearchMode()
- assert(carbonSession.isSearchModeEnabled)
-
- checkAnswer(
- sql(s"select * from $bloomDMSampleTable where id = 1"), expectedAnswer1)
- checkAnswer(
- sql(s"select * from $bloomDMSampleTable where city in ('city_999')"), expectedAnswer2)
-
- carbonSession.stopSearchMode()
- assert(!carbonSession.isSearchModeEnabled)
-
sql(s"DROP TABLE IF EXISTS $normalTable")
sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
}
@@ -975,10 +964,6 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
}
override protected def afterAll(): Unit = {
- // in case of search mode test case failed, stop search mode again
- if (carbonSession.isSearchModeEnabled) {
- carbonSession.stopSearchMode()
- }
deleteFile(bigFile)
deleteFile(smallFile)
sql(s"DROP TABLE IF EXISTS $normalTable")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 709f346..a37bf30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,7 +104,6 @@
<module>integration/spark-common-test</module>
<module>datamap/examples</module>
<module>store/sdk</module>
- <module>store/search</module>
<module>assembly</module>
<module>tools/cli</module>
</modules>
@@ -536,8 +535,6 @@
<sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
</sourceDirectories>
@@ -599,8 +596,6 @@
<sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
</sourceDirectories>
@@ -658,8 +653,6 @@
<sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
- <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory>
- <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
</sourceDirectories>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/pom.xml
----------------------------------------------------------------------
diff --git a/store/search/pom.xml b/store/search/pom.xml
deleted file mode 100644
index 2e2628a..0000000
--- a/store/search/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-parent</artifactId>
- <version>1.5.1-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>carbondata-search</artifactId>
- <name>Apache CarbonData :: Search </name>
-
- <properties>
- <dev.path>${basedir}/../../dev</dev.path>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-hadoop</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- <executions>
- <execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- <phase>compile</phase>
- </execution>
- <execution>
- <id>testCompile</id>
- <goals>
- <goal>testCompile</goal>
- </goals>
- <phase>test</phase>
- </execution>
- <execution>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0</version>
- <!-- Note config is repeated in surefire config -->
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>CarbonTestSuite.txt</filereports>
- <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
- </argLine>
- <stderr />
- <environmentVariables>
- </environmentVariables>
- <systemProperties>
- <java.awt.headless>true</java.awt.headless>
- <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
- </systemProperties>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
deleted file mode 100644
index 6492a9b..0000000
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.worker;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapDistributable;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.model.QueryModelBuilder;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonTaskInfo;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.spark.search.SearchRequest;
-import org.apache.spark.search.SearchResult;
-import org.apache.spark.search.ShutdownRequest;
-import org.apache.spark.search.ShutdownResponse;
-
-/**
- * Thread runnable for handling SearchRequest from master.
- */
-@InterfaceAudience.Internal
-public class SearchRequestHandler {
-
- private static final Logger LOG =
- LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
-
- public SearchResult handleSearch(SearchRequest request) {
- try {
- LOG.info(String.format("[SearchId:%d] receive search request", request.searchId()));
- List<CarbonRow> rows = handleRequest(request);
- LOG.info(String.format("[SearchId:%d] sending success response", request.searchId()));
- return createSuccessResponse(request, rows);
- } catch (IOException e) {
- LOG.error(e);
- LOG.info(String.format("[SearchId:%d] sending failure response", request.searchId()));
- return createFailureResponse(request, e);
- }
- }
-
- public ShutdownResponse handleShutdown(ShutdownRequest request) {
- LOG.info("Shutting down worker...");
- SearchModeDetailQueryExecutor.shutdownThreadPool();
- SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
- LOG.info("Worker shutted down");
- return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
- }
-
- private DataMapExprWrapper chooseFGDataMap(
- CarbonTable table,
- FilterResolverIntf filterInterface) throws IOException {
- DataMapChooser chooser = new DataMapChooser(table);
- return chooser.chooseFGDataMap(filterInterface);
- }
-
- /**
- * Builds {@link QueryModel} and read data from files
- */
- private List<CarbonRow> handleRequest(SearchRequest request)
- throws IOException {
- CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
- carbonTaskInfo.setTaskId(CarbonUtil.generateUUID());
- ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
- TableInfo tableInfo = request.tableInfo();
- CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
- QueryModel queryModel = createQueryModel(table, request);
-
- // in search mode, plain reader is better since it requires less memory
- queryModel.setVectorReader(false);
-
- CarbonMultiBlockSplit mbSplit = request.split().value();
- List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
- queryModel.setTableBlockInfos(list);
- long limit = request.limit();
- long rowCount = 0;
-
- LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
- request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size()));
- DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
- queryModel.getFilterExpressionResolverTree());
-
- // If there is DataMap selected in Master, prune the split by it
- if (fgDataMap != null) {
- queryModel = prune(request.searchId(), table, queryModel, mbSplit, fgDataMap);
- }
-
- // In search mode, reader will read multiple blocks by using a thread pool
- CarbonRecordReader<CarbonRow> reader =
- new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport(), new Configuration());
-
- // read all rows by the reader
- List<CarbonRow> rows = new LinkedList<>();
- try {
- reader.initialize(mbSplit, null);
-
- // loop to read required number of rows.
- // By default, if user does not specify the limit value, limit is Long.MaxValue
- while (reader.nextKeyValue() && rowCount < limit) {
- rows.add(reader.getCurrentValue());
- rowCount++;
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } finally {
- reader.close();
- }
- LOG.info(String.format("[SearchId:%d] scan completed, return %d rows",
- request.searchId(), rows.size()));
- return rows;
- }
-
- /**
- * If there is FGDataMap defined for this table and filter condition in the query,
- * prune the splits by the DataMap and set the pruned split into the QueryModel and return
- */
- private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel,
- CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException {
- Objects.requireNonNull(datamap);
- List<Segment> segments = new LinkedList<>();
- HashMap<String, Integer> uniqueSegments = new HashMap<>();
- LoadMetadataDetails[] loadMetadataDetails =
- SegmentStatusManager.readLoadMetadata(
- CarbonTablePath.getMetadataPath(table.getTablePath()));
- for (CarbonInputSplit split : mbSplit.getAllSplits()) {
- String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString();
- if (uniqueSegments.get(segmentId) == null) {
- segments.add(Segment.toSegment(segmentId,
- new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
- loadMetadataDetails, FileFactory.getConfiguration())));
- uniqueSegments.put(segmentId, 1);
- } else {
- uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
- }
- }
-
- List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments);
- List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>();
- for (int i = 0; i < distributables.size(); i++) {
- DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable();
- prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
- }
-
- HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
- for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
- pathToRead.put(prunedBlocklet.getFilePath().replace('\\', '/'), prunedBlocklet);
- }
-
- List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
- List<TableBlockInfo> blockToRead = new LinkedList<>();
- for (TableBlockInfo block : blocks) {
- if (pathToRead.keySet().contains(block.getFilePath())) {
- // If not set this, it will can't create FineGrainBlocklet object in
- // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
- block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
- blockToRead.add(block);
- }
- }
- LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", queryId,
- blockToRead.size()));
- queryModel.setTableBlockInfos(blockToRead);
- queryModel.setFG(true);
- return queryModel;
- }
-
- private QueryModel createQueryModel(CarbonTable table, SearchRequest request) {
- String[] projectColumns = request.projectColumns();
- Expression filter = null;
- if (request.filterExpression() != null) {
- filter = request.filterExpression();
- }
- return new QueryModelBuilder(table)
- .projectColumns(projectColumns)
- .filterExpression(filter)
- .build();
- }
-
- /**
- * create a failure response
- */
- private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) {
- return new SearchResult(request.searchId(), Status.FAILURE.ordinal(), throwable.getMessage(),
- new Object[0][]);
- }
-
- /**
- * create a success response with result rows
- */
- private SearchResult createSuccessResponse(SearchRequest request, List<CarbonRow> rows) {
- Iterator<CarbonRow> itor = rows.iterator();
- Object[][] output = new Object[rows.size()][];
- int i = 0;
- while (itor.hasNext()) {
- output[i++] = itor.next().getData();
- }
- return new SearchResult(request.searchId(), Status.SUCCESS.ordinal(), "", output);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java b/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
deleted file mode 100644
index 71df3e0..0000000
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.worker;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-/**
- * Status of RPC response
- */
-@InterfaceAudience.Internal
-public enum Status {
- SUCCESS, FAILURE
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
deleted file mode 100644
index 97951ea..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success, Try}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{SecurityManager, SerializableWritable, SparkConf}
-import org.apache.spark.rpc.netty.NettyRpcEnvFactory
-import org.apache.spark.search._
-import org.apache.spark.util.ThreadUtils
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.block.Distributable
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
-import org.apache.carbondata.hadoop.api.CarbonInputFormat
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.store.worker.Status
-
-/**
- * Master of CarbonSearch.
- * It provides a Registry service for worker to register.
- * And it provides search API to fire RPC call to workers.
- */
-@InterfaceAudience.Internal
-class Master(sparkConf: SparkConf) {
- private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- // worker host address map to EndpointRef
-
- private val random = new Random
-
- private var rpcEnv: RpcEnv = _
-
- private val scheduler: Scheduler = new Scheduler
-
- /** start service and listen on port passed in constructor */
- def startService(): Unit = {
- if (rpcEnv == null) {
- LOG.info("Start search mode master thread")
- val isStarted: AtomicBoolean = new AtomicBoolean(false)
- new Thread(new Runnable {
- override def run(): Unit = {
- val hostAddress = InetAddress.getLocalHost.getHostAddress
- var port = CarbonProperties.getSearchMasterPort
- var exception: BindException = null
- var numTry = 100 // we will try to create service at worse case 100 times
- do {
- try {
- LOG.info(s"starting registry-service on $hostAddress:$port")
- val config = RpcUtil.getRpcEnvConfig(
- sparkConf, "registry-service", hostAddress, "", port,
- new SecurityManager(sparkConf), clientMode = false)
- rpcEnv = new NettyRpcEnvFactory().create(config)
- numTry = 0
- } catch {
- case e: BindException =>
- // port is occupied, increase the port number and try again
- exception = e
- LOG.error(s"start registry-service failed: ${e.getMessage}")
- port = port + 1
- numTry = numTry - 1
- }
- } while (numTry > 0)
- if (rpcEnv == null) {
- // we have tried many times, but still failed to find an available port
- throw exception
- }
- val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this)
- rpcEnv.setupEndpoint("registry-service", registryEndpoint)
- if (isStarted.compareAndSet(false, false)) {
- synchronized {
- isStarted.compareAndSet(false, true)
- }
- }
- LOG.info("registry-service started")
- rpcEnv.awaitTermination()
- }
- }).start()
- var count = 0
- val countThreshold = 5000
- while (isStarted.compareAndSet(false, false) && count < countThreshold) {
- LOG.info(s"Waiting search mode master to start, retrying $count times")
- Thread.sleep(10)
- count = count + 1;
- }
- if (count >= countThreshold) {
- LOG.error(s"Search mode try $countThreshold times to start master but failed")
- throw new RuntimeException(
- s"Search mode try $countThreshold times to start master but failed")
- } else {
- LOG.info("Search mode master started")
- }
- } else {
- LOG.info("Search mode master has already started")
- }
- }
-
- def stopService(): Unit = {
- if (rpcEnv != null) {
- rpcEnv.shutdown()
- rpcEnv = null
- }
- }
-
- def stopAllWorkers(): Unit = {
- val futures = scheduler.getAllWorkers.toSeq.map { case (address, schedulable) =>
- (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user")))
- }
- futures.foreach { case (address, future) =>
- ThreadUtils.awaitResult(future, Duration.apply("10s"))
- future.value match {
- case Some(result) =>
- result match {
- case Success(response) => scheduler.removeWorker(address)
- case Failure(throwable) => throw new IOException(throwable.getMessage)
- }
- case None => throw new ExecutionTimeoutException
- }
- }
- }
-
- /** A new searcher is trying to register, add it to the map and connect to this searcher */
- def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = {
- LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " +
- s"with ${request.cores} cores")
- val workerId = UUID.randomUUID().toString
- val workerAddress = request.hostAddress
- val workerPort = request.port
- LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId")
-
- val endPointRef =
- rpcEnv.setupEndpointRef(RpcAddress(workerAddress, workerPort), "search-service")
- scheduler.addWorker(workerAddress,
- new Schedulable(workerId, workerAddress, workerPort, request.cores, endPointRef))
- LOG.info(s"worker ${request.hostAddress}:${request.port} registered")
- RegisterWorkerResponse(workerId)
- }
-
- /**
- * Execute search by firing RPC call to worker, return the result rows
- * @param table table to search
- * @param columns projection column names
- * @param filter filter expression
- * @param globalLimit max number of rows required in Master
- * @param localLimit max number of rows required in Worker
- * @return
- */
- def search(table: CarbonTable, columns: Array[String], filter: Expression,
- globalLimit: Long, localLimit: Long): Array[CarbonRow] = {
- Objects.requireNonNull(table)
- Objects.requireNonNull(columns)
- if (globalLimit < 0 || localLimit < 0) {
- throw new IllegalArgumentException("limit should be positive")
- }
-
- val queryId = random.nextInt
- var rowCount = 0
- val output = new ArrayBuffer[CarbonRow]
-
- def onSuccess(result: SearchResult): Unit = {
- // in case of RPC success, collect all rows in response message
- if (result.queryId != queryId) {
- throw new IOException(
- s"queryId in response does not match request: ${result.queryId} != $queryId")
- }
- if (result.status != Status.SUCCESS.ordinal()) {
- throw new IOException(s"failure in worker: ${ result.message }")
- }
-
- val itor = result.rows.iterator
- while (itor.hasNext && rowCount < globalLimit) {
- output += new CarbonRow(itor.next())
- rowCount = rowCount + 1
- }
- LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount")
- }
- def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }")
- def onTimedout() = throw new ExecutionTimeoutException()
-
- // prune data and get a mapping of worker hostname to list of blocks,
- // then add these blocks to the SearchRequest and fire the RPC call
- val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
- val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
- // Build a SearchRequest
- val split = new SerializableWritable[CarbonMultiBlockSplit](
- new CarbonMultiBlockSplit(blocks, splitAddress))
- val request =
- SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
-
- // Find an Endpoind and send the request to it
- // This RPC is non-blocking so that we do not need to wait before send to next worker
- scheduler.sendRequestAsync[SearchResult](splitAddress, request)
- }
-
- // loop to get the result of each Worker
- tuple.foreach { case (worker: Schedulable, future: Future[SearchResult]) =>
-
- // if we have enough data already, we do not need to collect more result
- if (rowCount < globalLimit) {
- // wait for worker
- val timeout = CarbonProperties
- .getInstance()
- .getProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT,
- CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT)
- ThreadUtils.awaitResult(future, Duration.apply(timeout))
- LOG.info(s"[SearchId:$queryId] receive search response from worker " +
- s"${worker.address}:${worker.port}")
- try {
- future.value match {
- case Some(response: Try[SearchResult]) =>
- response match {
- case Success(result) => onSuccess(result)
- case Failure(e) => onFaiure(e)
- }
- case None => onTimedout()
- }
- } finally {
- worker.workload.decrementAndGet()
- }
- }
- }
- output.toArray
- }
-
- /**
- * Prune data by using CarbonInputFormat.getSplit
- * Return a mapping of host address to list of block
- */
- private def pruneBlock(
- table: CarbonTable,
- columns: Array[String],
- filter: Expression): JMap[String, JList[Distributable]] = {
- val jobConf = new JobConf(new Configuration)
- val job = new Job(jobConf)
- val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
- job, table, columns, filter, null, null)
-
- // We will do FG pruning in reader side, so don't do it here
- CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false)
- val splits = format.getSplits(job)
- val distributables = splits.asScala.map { split =>
- split.asInstanceOf[Distributable]
- }
- CarbonLoaderUtil.nodeBlockMapping(
- distributables.asJava,
- -1,
- getWorkers.asJava,
- CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST,
- null)
- }
-
- /** return hostname of all workers */
- def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq
-}
-
-// Exception if execution timed out in search mode
-class ExecutionTimeoutException extends RuntimeException
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala b/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala
deleted file mode 100644
index f15bb8f..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf}
-import org.apache.spark.util.Utils
-
-object RpcUtil {
-
- def getRpcEnvConfig(conf: SparkConf,
- name: String,
- bindAddress: String,
- advertiseAddress: String,
- port: Int,
- securityManager: SecurityManager,
- clientMode: Boolean): RpcEnvConfig = {
- val className = "org.apache.spark.rpc.RpcEnvConfig"
- if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) {
- createObject(className, conf, name, bindAddress,
- advertiseAddress, port.asInstanceOf[Object],
- securityManager, clientMode.asInstanceOf[Object])._1.asInstanceOf[RpcEnvConfig]
- } else if (SPARK_VERSION.startsWith("2.3")) {
- // numUsableCores if it is 0 then spark will consider the available CPUs on the host.
- val numUsableCores: Int = 0
- createObject(className, conf, name, bindAddress,
- advertiseAddress, port.asInstanceOf[Object],
- securityManager, numUsableCores.asInstanceOf[Object],
- clientMode.asInstanceOf[Object])._1.asInstanceOf[RpcEnvConfig]
- } else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
- }
-
- def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
- val clazz = Utils.classForName(className)
- val ctor = clazz.getConstructors.head
- ctor.setAccessible(true)
- (ctor.newInstance(conArgs: _*), clazz)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
deleted file mode 100644
index 26208d0..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.reflect.ClassTag
-import scala.util.Random
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * [[org.apache.spark.rpc.Master]] uses Scheduler to pick a Worker to send request
- */
-private[rpc] class Scheduler {
- // mapping of worker IP address to worker instance
- private val workers = mutable.Map[String, Schedulable]()
- private val random = new Random()
-
- private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- /**
- * Pick a Worker according to the address and workload of the Worker
- * Invoke the RPC and return Future result
- */
- def sendRequestAsync[T: ClassTag](
- splitAddress: String,
- request: Any): (Schedulable, Future[T]) = {
- require(splitAddress != null)
- if (workers.isEmpty) {
- throw new IOException("No worker is available")
- }
- var worker = pickWorker(splitAddress)
-
- // check whether worker exceed max workload, if exceeded, pick next worker
- val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores)
- var numTry = workers.size
- do {
- if (worker.workload.get() >= maxWorkload) {
- LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...")
- worker = pickNextWorker(worker)
- numTry = numTry - 1
- } else {
- numTry = -1
- }
- } while (numTry > 0)
- if (numTry == 0) {
- // tried so many times and still not able to find Worker
- throw new WorkerTooBusyException(
- s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload")
- }
- LOG.info(s"sending search request to worker ${worker.address}:${worker.port}")
- val future = worker.ref.ask(request)
- worker.workload.incrementAndGet()
- (worker, future)
- }
-
- private def pickWorker[T: ClassTag](splitAddress: String) = {
- try {
- workers(splitAddress)
- } catch {
- case e: NoSuchElementException =>
- // no local worker available, choose one worker randomly
- pickRandomWorker()
- }
- }
-
- /** pick a worker randomly */
- private def pickRandomWorker() = {
- val index = random.nextInt(workers.size)
- workers.toSeq(index)._2
- }
-
- /** pick the next worker of the input worker in the [[Scheduler.workers]] */
- private def pickNextWorker(worker: Schedulable) = {
- val index = workers.zipWithIndex.find { case ((address, w), index) =>
- w == worker
- }.get._2
- if (index == workers.size - 1) {
- workers.toSeq.head._2
- } else {
- workers.toSeq(index + 1)._2
- }
- }
-
- /** A new searcher is trying to register, add it to the map and connect to this searcher */
- def addWorker(address: String, schedulable: Schedulable): Unit = {
- require(schedulable != null)
- require(address.equals(schedulable.address))
- workers(address) = schedulable
- }
-
- def removeWorker(address: String): Unit = {
- workers.remove(address)
- }
-
- def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator
-}
-
-/**
- * Represent a Worker which [[Scheduler]] can send
- * Search request on it
- * @param id Worker ID, a UUID string
- * @param cores, number of cores in Worker
- * @param ref RPC endpoint reference
- * @param workload number of outstanding request sent to Worker
- */
-private[rpc] class Schedulable(
- val id: String,
- val address: String,
- val port: Int,
- val cores: Int,
- val ref: RpcEndpointRef,
- var workload: AtomicInteger) {
- def this(id: String, address: String, port: Int, cores: Int, ref: RpcEndpointRef) = {
- this(id, address, port, cores, ref, new AtomicInteger())
- }
-}
-
-class WorkerTooBusyException(message: String) extends RuntimeException(message)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
deleted file mode 100644
index 08baeeb..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.rpc.netty.NettyRpcEnvFactory
-import org.apache.spark.search.{RegisterWorkerRequest, RegisterWorkerResponse, Searcher}
-import org.apache.spark.util.ThreadUtils
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-@InterfaceAudience.Internal
-object Worker {
- private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- private val hostAddress = InetAddress.getLocalHost.getHostAddress
- private var port: Int = _
-
- def init(masterHostAddress: String, masterPort: Int): Unit = {
- LOG.info(s"initializing worker...")
- startService()
- LOG.info(s"registering to master $masterHostAddress:$masterPort")
- val workerId = registerToMaster(masterHostAddress, masterPort)
- LOG.info(s"worker registered to master, workerId: $workerId")
- }
-
- /**
- * Start to listen on port [[CarbonProperties.getSearchWorkerPort]]
- */
- private def startService(): Unit = {
- new Thread(new Runnable {
- override def run(): Unit = {
- port = CarbonProperties.getSearchWorkerPort
- val conf = new SparkConf()
- var rpcEnv: RpcEnv = null
- var exception: BindException = null
- var numTry = 100 // we will try to create service at worse case 100 times
- do {
- try {
- LOG.info(s"starting search-service on $hostAddress:$port")
- val config = RpcUtil.getRpcEnvConfig(
- conf, s"worker-$hostAddress", hostAddress, "", port,
- new SecurityManager(conf), clientMode = false)
- rpcEnv = new NettyRpcEnvFactory().create(config)
- numTry = 0
- } catch {
- case e: BindException =>
- // port is occupied, increase the port number and try again
- exception = e
- LOG.error(s"start search-service failed: ${e.getMessage}")
- port = port + 1
- numTry = numTry - 1
- }
- } while (numTry > 0)
- if (rpcEnv == null) {
- // we have tried many times, but still failed to find an available port
- throw exception
- }
- val searchEndpoint: RpcEndpoint = new Searcher(rpcEnv)
- rpcEnv.setupEndpoint("search-service", searchEndpoint)
- LOG.info("search-service started")
- rpcEnv.awaitTermination()
- }
- }).start()
- }
-
- private def registerToMaster(masterHostAddress: String, masterPort: Int): String = {
- LOG.info(s"trying to register to master $masterHostAddress:$masterPort")
- val conf = new SparkConf()
- val config = RpcUtil.getRpcEnvConfig(conf, "registry-client", masterHostAddress, "", masterPort,
- new SecurityManager(conf), clientMode = true)
- val rpcEnv: RpcEnv = new NettyRpcEnvFactory().create(config)
-
- val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(
- RpcAddress(masterHostAddress, masterPort), "registry-service")
- val cores = Runtime.getRuntime.availableProcessors()
-
- val request = RegisterWorkerRequest(hostAddress, port, cores)
- val future = endPointRef.ask[RegisterWorkerResponse](request)
- ThreadUtils.awaitResult(future, Duration.apply("10s"))
- future.value match {
- case Some(result) =>
- result match {
- case Success(response) =>
- LOG.info("worker registered")
- response.workerId
- case Failure(throwable) =>
- LOG.error(s"worker failed to registered: $throwable")
- throw new IOException(throwable.getMessage)
- }
- case None =>
- LOG.error("worker register timeout")
- throw new ExecutionTimeoutException
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/search/Registry.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Registry.scala b/store/search/src/main/scala/org/apache/spark/search/Registry.scala
deleted file mode 100644
index 22e766d..0000000
--- a/store/search/src/main/scala/org/apache/spark/search/Registry.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.search
-
-import org.apache.spark.rpc.{Master, RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-/**
- * Registry service implementation. It adds worker to master.
- */
-class Registry(override val rpcEnv: RpcEnv, master: Master) extends RpcEndpoint {
- private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- override def onStart(): Unit = {
- LOG.info("Registry Endpoint started")
- }
-
- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case req@RegisterWorkerRequest(_, _, _) =>
- val response = master.addWorker(req)
- context.reply(response)
- }
-
- override def onStop(): Unit = {
- LOG.info("Registry Endpoint stopped")
- }
-
-}
-
-case class RegisterWorkerRequest(
- hostAddress: String,
- port: Int,
- cores: Int)
-
-case class RegisterWorkerResponse(
- workerId: String)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
deleted file mode 100644
index 6fbea15..0000000
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.search
-
-import org.apache.spark.SerializableWritable
-import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
-import org.apache.carbondata.store.worker.SearchRequestHandler
-
-/**
- * Search service implementation
- */
-class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint {
- private val LOG = LogServiceFactory.getLogService(this.getClass.getName)
-
- override def onStart(): Unit = {
- LOG.info("Searcher Endpoint started")
- }
-
- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case req: SearchRequest =>
- val response = new SearchRequestHandler().handleSearch(req)
- context.reply(response)
-
- case req: ShutdownRequest =>
- val response = new SearchRequestHandler().handleShutdown(req)
- context.reply(response)
-
- }
-
- override def onStop(): Unit = {
- LOG.info("Searcher Endpoint stopped")
- }
-}
-
-// Search request sent from master to worker
-case class SearchRequest(
- searchId: Int,
- split: SerializableWritable[CarbonMultiBlockSplit],
- tableInfo: TableInfo,
- projectColumns: Array[String],
- filterExpression: Expression,
- limit: Long)
-
-// Search result sent from worker to master
-case class SearchResult(
- queryId: Int,
- status: Int,
- message: String,
- rows: Array[Array[Object]])
-
-// Shutdown request sent from master to worker
-case class ShutdownRequest(
- reason: String)
-
-// Shutdown response sent from worker to master
-case class ShutdownResponse(
- status: Int,
- message: String)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
----------------------------------------------------------------------
diff --git a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java b/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
deleted file mode 100644
index 88d925f..0000000
--- a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-public class SearchServiceTest {
-// @Test
-// public void testStartStopService() throws IOException, ExecutionException, InterruptedException {
-// Master master = new Master(9999);
-// master.startService();
-//
-// Worker worker = Worker.getInstance();
-// worker.init(InetAddress.getLocalHost().getHostName(), 9999);
-//
-// Set<String> workers = master.getWorkers();
-// Assert.assertEquals(1, workers.size());
-// Assert.assertEquals(InetAddress.getLocalHost().getHostName(), workers.toArray()[0]);
-//
-// master.stopAllWorkers();
-// master.stopService();
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
deleted file mode 100644
index 8780dc0..0000000
--- a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import scala.concurrent.Future
-import scala.reflect.ClassTag
-
-import org.apache.spark.SparkConf
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-class SchedulerSuite extends FunSuite with BeforeAndAfterEach {
-
- var scheduler: Scheduler = _
- var w1: Schedulable = _
- var w2: Schedulable = _
- var w3: Schedulable = _
-
- override def beforeEach(): Unit = {
- scheduler = new Scheduler()
- w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef())
- w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef())
- w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef())
-
- scheduler.addWorker("1.1.1.1", w1)
- scheduler.addWorker("1.1.1.2", w2)
- scheduler.addWorker("1.1.1.3", w3)
- }
-
- test("test addWorker, removeWorker, getAllWorkers") {
- assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
- scheduler.removeWorker("1.1.1.2")
- assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
- val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef())
- scheduler.addWorker("1.1.1.4", w4)
- assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet)
- assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id)
- }
-
- test("test normal schedule") {
- val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
- assertResult(w1.id)(r1.id)
- val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- assertResult(w2.id)(r2.id)
- val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- assertResult(w3.id)(r3.id)
- val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null)
- assertResult(w1.id)(r4.id)
- val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- assertResult(w2.id)(r5.id)
- val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- assertResult(w3.id)(r6.id)
- }
-
- test("test worker unavailable") {
- val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null)
- assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id))
- }
-
- test("test reschedule when target worker is overload") {
- // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
- (1 to 40).foreach { i =>
- val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- }
- val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- // it must be worker1 since worker3 exceed max workload
- assertResult(w1.id)(r.id)
- }
-
- test("test all workers are overload") {
- // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
- (1 to 40).foreach { i =>
- val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
- val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- }
-
- val e = intercept[WorkerTooBusyException] {
- scheduler.sendRequestAsync("1.1.1.3", null)
- }
- }
-
- test("test user configured overload param") {
- val original = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-
- CarbonProperties.getInstance().addProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3")
-
- (1 to 3).foreach { i =>
- val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
- val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- }
-
- val e = intercept[WorkerTooBusyException] {
- scheduler.sendRequestAsync("1.1.1.3", null)
- }
-
- if (original != null) {
- CarbonProperties.getInstance().addProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original)
- }
- }
-
- test("test invalid property") {
- intercept[IllegalArgumentException] {
- CarbonProperties.getInstance().addProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3")
- }
- var value = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
- assertResult(null)(value)
-
- intercept[NumberFormatException] {
- CarbonProperties.getInstance().addProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s")
- }
- value = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
- assertResult(null)(value)
- }
-}
-
-class DummyRef extends RpcEndpointRef(new SparkConf) {
- override def address: RpcAddress = null
-
- override def name: String = ""
-
- override def send(message: Any): Unit = { }
-
- override def ask[T](message: Any, timeout: RpcTimeout)
- (implicit evidence$1: ClassTag[T]): Future[T] = null
-}
\ No newline at end of file