You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 17:59:49 UTC

[22/50] [abbrv] carbondata git commit: [HOTFIX] Remove search mode module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
index 3b5b5ca..4985718 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -294,17 +294,6 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
     val expectedAnswer1 = sql(s"select * from $normalTable where id = 1").collect()
     val expectedAnswer2 = sql(s"select * from $normalTable where city in ('city_999')").collect()
 
-    carbonSession.startSearchMode()
-    assert(carbonSession.isSearchModeEnabled)
-
-    checkAnswer(
-      sql(s"select * from $bloomDMSampleTable where id = 1"), expectedAnswer1)
-    checkAnswer(
-      sql(s"select * from $bloomDMSampleTable where city in ('city_999')"), expectedAnswer2)
-
-    carbonSession.stopSearchMode()
-    assert(!carbonSession.isSearchModeEnabled)
-
     sql(s"DROP TABLE IF EXISTS $normalTable")
     sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
   }
@@ -975,10 +964,6 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
   }
 
   override protected def afterAll(): Unit = {
-    // in case of search mode test case failed, stop search mode again
-    if (carbonSession.isSearchModeEnabled) {
-      carbonSession.stopSearchMode()
-    }
     deleteFile(bigFile)
     deleteFile(smallFile)
     sql(s"DROP TABLE IF EXISTS $normalTable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 709f346..a37bf30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,7 +104,6 @@
     <module>integration/spark-common-test</module>
     <module>datamap/examples</module>
     <module>store/sdk</module>
-    <module>store/search</module>
     <module>assembly</module>
     <module>tools/cli</module>
   </modules>
@@ -536,8 +535,6 @@
                 <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
               </sourceDirectories>
@@ -599,8 +596,6 @@
                 <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
               </sourceDirectories>
@@ -658,8 +653,6 @@
                 <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory>
-                <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory>
-                <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory>
               </sourceDirectories>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/pom.xml
----------------------------------------------------------------------
diff --git a/store/search/pom.xml b/store/search/pom.xml
deleted file mode 100644
index 2e2628a..0000000
--- a/store/search/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.carbondata</groupId>
-    <artifactId>carbondata-parent</artifactId>
-    <version>1.5.1-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>carbondata-search</artifactId>
-  <name>Apache CarbonData :: Search </name>
-
-  <properties>
-    <dev.path>${basedir}/../../dev</dev.path>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-hadoop</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${spark.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <testSourceDirectory>src/test/scala</testSourceDirectory>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.scala-tools</groupId>
-        <artifactId>maven-scala-plugin</artifactId>
-        <version>2.15.2</version>
-        <executions>
-          <execution>
-            <id>compile</id>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-            <phase>compile</phase>
-          </execution>
-          <execution>
-            <id>testCompile</id>
-            <goals>
-              <goal>testCompile</goal>
-            </goals>
-            <phase>test</phase>
-          </execution>
-          <execution>
-            <phase>process-resources</phase>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-        <version>1.0</version>
-        <!-- Note config is repeated in surefire config -->
-        <configuration>
-          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-          <junitxml>.</junitxml>
-          <filereports>CarbonTestSuite.txt</filereports>
-          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
-          </argLine>
-          <stderr />
-          <environmentVariables>
-          </environmentVariables>
-          <systemProperties>
-            <java.awt.headless>true</java.awt.headless>
-            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
-          </systemProperties>
-        </configuration>
-        <executions>
-          <execution>
-            <id>test</id>
-            <goals>
-              <goal>test</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
deleted file mode 100644
index 6492a9b..0000000
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.worker;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapDistributable;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.model.QueryModelBuilder;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonTaskInfo;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.spark.search.SearchRequest;
-import org.apache.spark.search.SearchResult;
-import org.apache.spark.search.ShutdownRequest;
-import org.apache.spark.search.ShutdownResponse;
-
-/**
- * Thread runnable for handling SearchRequest from master.
- */
-@InterfaceAudience.Internal
-public class SearchRequestHandler {
-
-  private static final Logger LOG =
-      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
-
-  public SearchResult handleSearch(SearchRequest request) {
-    try {
-      LOG.info(String.format("[SearchId:%d] receive search request", request.searchId()));
-      List<CarbonRow> rows = handleRequest(request);
-      LOG.info(String.format("[SearchId:%d] sending success response", request.searchId()));
-      return createSuccessResponse(request, rows);
-    } catch (IOException e) {
-      LOG.error(e);
-      LOG.info(String.format("[SearchId:%d] sending failure response", request.searchId()));
-      return createFailureResponse(request, e);
-    }
-  }
-
-  public ShutdownResponse handleShutdown(ShutdownRequest request) {
-    LOG.info("Shutting down worker...");
-    SearchModeDetailQueryExecutor.shutdownThreadPool();
-    SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
-    LOG.info("Worker shutted down");
-    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
-  }
-
-  private DataMapExprWrapper chooseFGDataMap(
-          CarbonTable table,
-          FilterResolverIntf filterInterface) throws IOException {
-    DataMapChooser chooser = new DataMapChooser(table);
-    return chooser.chooseFGDataMap(filterInterface);
-  }
-
-  /**
-   * Builds {@link QueryModel} and read data from files
-   */
-  private List<CarbonRow> handleRequest(SearchRequest request)
-      throws IOException {
-    CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
-    carbonTaskInfo.setTaskId(CarbonUtil.generateUUID());
-    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
-    TableInfo tableInfo = request.tableInfo();
-    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
-    QueryModel queryModel = createQueryModel(table, request);
-
-    // in search mode, plain reader is better since it requires less memory
-    queryModel.setVectorReader(false);
-
-    CarbonMultiBlockSplit mbSplit = request.split().value();
-    List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
-    queryModel.setTableBlockInfos(list);
-    long limit = request.limit();
-    long rowCount = 0;
-
-    LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
-        request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size()));
-    DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
-            queryModel.getFilterExpressionResolverTree());
-
-    // If there is DataMap selected in Master, prune the split by it
-    if (fgDataMap != null) {
-      queryModel = prune(request.searchId(), table, queryModel, mbSplit, fgDataMap);
-    }
-
-    // In search mode, reader will read multiple blocks by using a thread pool
-    CarbonRecordReader<CarbonRow> reader =
-        new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport(), new Configuration());
-
-    // read all rows by the reader
-    List<CarbonRow> rows = new LinkedList<>();
-    try {
-      reader.initialize(mbSplit, null);
-
-      // loop to read required number of rows.
-      // By default, if user does not specify the limit value, limit is Long.MaxValue
-      while (reader.nextKeyValue() && rowCount < limit) {
-        rows.add(reader.getCurrentValue());
-        rowCount++;
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } finally {
-      reader.close();
-    }
-    LOG.info(String.format("[SearchId:%d] scan completed, return %d rows",
-        request.searchId(), rows.size()));
-    return rows;
-  }
-
-  /**
-   * If there is FGDataMap defined for this table and filter condition in the query,
-   * prune the splits by the DataMap and set the pruned split into the QueryModel and return
-   */
-  private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel,
-      CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException {
-    Objects.requireNonNull(datamap);
-    List<Segment> segments = new LinkedList<>();
-    HashMap<String, Integer> uniqueSegments = new HashMap<>();
-    LoadMetadataDetails[] loadMetadataDetails =
-        SegmentStatusManager.readLoadMetadata(
-            CarbonTablePath.getMetadataPath(table.getTablePath()));
-    for (CarbonInputSplit split : mbSplit.getAllSplits()) {
-      String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString();
-      if (uniqueSegments.get(segmentId) == null) {
-        segments.add(Segment.toSegment(segmentId,
-            new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
-                loadMetadataDetails, FileFactory.getConfiguration())));
-        uniqueSegments.put(segmentId, 1);
-      } else {
-        uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
-      }
-    }
-
-    List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments);
-    List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>();
-    for (int i = 0; i < distributables.size(); i++) {
-      DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable();
-      prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
-    }
-
-    HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
-    for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
-      pathToRead.put(prunedBlocklet.getFilePath().replace('\\', '/'), prunedBlocklet);
-    }
-
-    List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
-    List<TableBlockInfo> blockToRead = new LinkedList<>();
-    for (TableBlockInfo block : blocks) {
-      if (pathToRead.keySet().contains(block.getFilePath())) {
-        // If not set this, it will can't create FineGrainBlocklet object in
-        // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
-        block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
-        blockToRead.add(block);
-      }
-    }
-    LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", queryId,
-        blockToRead.size()));
-    queryModel.setTableBlockInfos(blockToRead);
-    queryModel.setFG(true);
-    return queryModel;
-  }
-
-  private QueryModel createQueryModel(CarbonTable table, SearchRequest request) {
-    String[] projectColumns = request.projectColumns();
-    Expression filter = null;
-    if (request.filterExpression() != null) {
-      filter = request.filterExpression();
-    }
-    return new QueryModelBuilder(table)
-        .projectColumns(projectColumns)
-        .filterExpression(filter)
-        .build();
-  }
-
-  /**
-   * create a failure response
-   */
-  private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) {
-    return new SearchResult(request.searchId(), Status.FAILURE.ordinal(), throwable.getMessage(),
-        new Object[0][]);
-  }
-
-  /**
-   * create a success response with result rows
-   */
-  private SearchResult createSuccessResponse(SearchRequest request, List<CarbonRow> rows) {
-    Iterator<CarbonRow> itor = rows.iterator();
-    Object[][] output = new Object[rows.size()][];
-    int i = 0;
-    while (itor.hasNext()) {
-      output[i++] = itor.next().getData();
-    }
-    return new SearchResult(request.searchId(), Status.SUCCESS.ordinal(), "", output);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java b/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
deleted file mode 100644
index 71df3e0..0000000
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.worker;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-/**
- * Status of RPC response
- */
-@InterfaceAudience.Internal
-public enum Status {
-  SUCCESS, FAILURE
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
deleted file mode 100644
index 97951ea..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success, Try}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{SecurityManager, SerializableWritable, SparkConf}
-import org.apache.spark.rpc.netty.NettyRpcEnvFactory
-import org.apache.spark.search._
-import org.apache.spark.util.ThreadUtils
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.block.Distributable
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
-import org.apache.carbondata.hadoop.api.CarbonInputFormat
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.store.worker.Status
-
-/**
- * Master of CarbonSearch.
- * It provides a Registry service for worker to register.
- * And it provides search API to fire RPC call to workers.
- */
-@InterfaceAudience.Internal
-class Master(sparkConf: SparkConf) {
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  // worker host address map to EndpointRef
-
-  private val random = new Random
-
-  private var rpcEnv: RpcEnv = _
-
-  private val scheduler: Scheduler = new Scheduler
-
-  /** start service and listen on port passed in constructor */
-  def startService(): Unit = {
-    if (rpcEnv == null) {
-      LOG.info("Start search mode master thread")
-      val isStarted: AtomicBoolean = new AtomicBoolean(false)
-      new Thread(new Runnable {
-        override def run(): Unit = {
-          val hostAddress = InetAddress.getLocalHost.getHostAddress
-          var port = CarbonProperties.getSearchMasterPort
-          var exception: BindException = null
-          var numTry = 100  // we will try to create service at worse case 100 times
-          do {
-            try {
-              LOG.info(s"starting registry-service on $hostAddress:$port")
-              val config = RpcUtil.getRpcEnvConfig(
-                sparkConf, "registry-service", hostAddress, "", port,
-                new SecurityManager(sparkConf), clientMode = false)
-              rpcEnv = new NettyRpcEnvFactory().create(config)
-              numTry = 0
-            } catch {
-              case e: BindException =>
-                // port is occupied, increase the port number and try again
-                exception = e
-                LOG.error(s"start registry-service failed: ${e.getMessage}")
-                port = port + 1
-                numTry = numTry - 1
-            }
-          } while (numTry > 0)
-          if (rpcEnv == null) {
-            // we have tried many times, but still failed to find an available port
-            throw exception
-          }
-          val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this)
-          rpcEnv.setupEndpoint("registry-service", registryEndpoint)
-          if (isStarted.compareAndSet(false, false)) {
-            synchronized {
-              isStarted.compareAndSet(false, true)
-            }
-          }
-          LOG.info("registry-service started")
-          rpcEnv.awaitTermination()
-        }
-      }).start()
-      var count = 0
-      val countThreshold = 5000
-      while (isStarted.compareAndSet(false, false) && count < countThreshold) {
-        LOG.info(s"Waiting search mode master to start, retrying $count times")
-        Thread.sleep(10)
-        count = count + 1;
-      }
-      if (count >= countThreshold) {
-        LOG.error(s"Search mode try $countThreshold times to start master but failed")
-        throw new RuntimeException(
-          s"Search mode try $countThreshold times to start master but failed")
-      } else {
-        LOG.info("Search mode master started")
-      }
-    } else {
-      LOG.info("Search mode master has already started")
-    }
-  }
-
-  def stopService(): Unit = {
-    if (rpcEnv != null) {
-      rpcEnv.shutdown()
-      rpcEnv = null
-    }
-  }
-
-  def stopAllWorkers(): Unit = {
-    val futures = scheduler.getAllWorkers.toSeq.map { case (address, schedulable) =>
-      (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user")))
-    }
-    futures.foreach { case (address, future) =>
-      ThreadUtils.awaitResult(future, Duration.apply("10s"))
-      future.value match {
-        case Some(result) =>
-          result match {
-            case Success(response) => scheduler.removeWorker(address)
-            case Failure(throwable) => throw new IOException(throwable.getMessage)
-          }
-        case None => throw new ExecutionTimeoutException
-      }
-    }
-  }
-
-  /** A new searcher is trying to register, add it to the map and connect to this searcher */
-  def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = {
-    LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " +
-             s"with ${request.cores} cores")
-    val workerId = UUID.randomUUID().toString
-    val workerAddress = request.hostAddress
-    val workerPort = request.port
-    LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId")
-
-    val endPointRef =
-      rpcEnv.setupEndpointRef(RpcAddress(workerAddress, workerPort), "search-service")
-    scheduler.addWorker(workerAddress,
-      new Schedulable(workerId, workerAddress, workerPort, request.cores, endPointRef))
-    LOG.info(s"worker ${request.hostAddress}:${request.port} registered")
-    RegisterWorkerResponse(workerId)
-  }
-
-  /**
-   * Execute search by firing RPC call to worker, return the result rows
-   * @param table table to search
-   * @param columns projection column names
-   * @param filter filter expression
-   * @param globalLimit max number of rows required in Master
-   * @param localLimit max number of rows required in Worker
-   * @return
-   */
-  def search(table: CarbonTable, columns: Array[String], filter: Expression,
-      globalLimit: Long, localLimit: Long): Array[CarbonRow] = {
-    Objects.requireNonNull(table)
-    Objects.requireNonNull(columns)
-    if (globalLimit < 0 || localLimit < 0) {
-      throw new IllegalArgumentException("limit should be positive")
-    }
-
-    val queryId = random.nextInt
-    var rowCount = 0
-    val output = new ArrayBuffer[CarbonRow]
-
-    def onSuccess(result: SearchResult): Unit = {
-      // in case of RPC success, collect all rows in response message
-      if (result.queryId != queryId) {
-        throw new IOException(
-          s"queryId in response does not match request: ${result.queryId} != $queryId")
-      }
-      if (result.status != Status.SUCCESS.ordinal()) {
-        throw new IOException(s"failure in worker: ${ result.message }")
-      }
-
-      val itor = result.rows.iterator
-      while (itor.hasNext && rowCount < globalLimit) {
-        output += new CarbonRow(itor.next())
-        rowCount = rowCount + 1
-      }
-      LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount")
-    }
-    def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }")
-    def onTimedout() = throw new ExecutionTimeoutException()
-
-    // prune data and get a mapping of worker hostname to list of blocks,
-    // then add these blocks to the SearchRequest and fire the RPC call
-    val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
-    val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
-      // Build a SearchRequest
-      val split = new SerializableWritable[CarbonMultiBlockSplit](
-        new CarbonMultiBlockSplit(blocks, splitAddress))
-      val request =
-        SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
-
-      // Find an Endpoind and send the request to it
-      // This RPC is non-blocking so that we do not need to wait before send to next worker
-      scheduler.sendRequestAsync[SearchResult](splitAddress, request)
-    }
-
-    // loop to get the result of each Worker
-    tuple.foreach { case (worker: Schedulable, future: Future[SearchResult]) =>
-
-      // if we have enough data already, we do not need to collect more result
-      if (rowCount < globalLimit) {
-        // wait for worker
-        val timeout = CarbonProperties
-          .getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT,
-            CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT)
-        ThreadUtils.awaitResult(future, Duration.apply(timeout))
-        LOG.info(s"[SearchId:$queryId] receive search response from worker " +
-          s"${worker.address}:${worker.port}")
-        try {
-          future.value match {
-            case Some(response: Try[SearchResult]) =>
-              response match {
-                case Success(result) => onSuccess(result)
-                case Failure(e) => onFaiure(e)
-              }
-            case None => onTimedout()
-          }
-        } finally {
-          worker.workload.decrementAndGet()
-        }
-      }
-    }
-    output.toArray
-  }
-
-  /**
-   * Prune data by using CarbonInputFormat.getSplit
-   * Return a mapping of host address to list of block
-   */
-  private def pruneBlock(
-      table: CarbonTable,
-      columns: Array[String],
-      filter: Expression): JMap[String, JList[Distributable]] = {
-    val jobConf = new JobConf(new Configuration)
-    val job = new Job(jobConf)
-    val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
-      job, table, columns, filter, null, null)
-
-    // We will do FG pruning in reader side, so don't do it here
-    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false)
-    val splits = format.getSplits(job)
-    val distributables = splits.asScala.map { split =>
-      split.asInstanceOf[Distributable]
-    }
-    CarbonLoaderUtil.nodeBlockMapping(
-      distributables.asJava,
-      -1,
-      getWorkers.asJava,
-      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST,
-      null)
-  }
-
-  /** return hostname of all workers */
-  def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq
-}
-
-// Exception if execution timed out in search mode
-class ExecutionTimeoutException extends RuntimeException

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala b/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala
deleted file mode 100644
index f15bb8f..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf}
-import org.apache.spark.util.Utils
-
-object RpcUtil {
-
-  def getRpcEnvConfig(conf: SparkConf,
-      name: String,
-      bindAddress: String,
-      advertiseAddress: String,
-      port: Int,
-      securityManager: SecurityManager,
-      clientMode: Boolean): RpcEnvConfig = {
-    val className = "org.apache.spark.rpc.RpcEnvConfig"
-    if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) {
-      createObject(className, conf, name, bindAddress,
-        advertiseAddress, port.asInstanceOf[Object],
-        securityManager, clientMode.asInstanceOf[Object])._1.asInstanceOf[RpcEnvConfig]
-    } else if (SPARK_VERSION.startsWith("2.3")) {
-      // numUsableCores if it is 0 then spark will consider the available CPUs on the host.
-      val numUsableCores: Int = 0
-      createObject(className, conf, name, bindAddress,
-        advertiseAddress, port.asInstanceOf[Object],
-        securityManager, numUsableCores.asInstanceOf[Object],
-        clientMode.asInstanceOf[Object])._1.asInstanceOf[RpcEnvConfig]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
-  }
-
-  def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
-    val clazz = Utils.classForName(className)
-    val ctor = clazz.getConstructors.head
-    ctor.setAccessible(true)
-    (ctor.newInstance(conArgs: _*), clazz)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
deleted file mode 100644
index 26208d0..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.reflect.ClassTag
-import scala.util.Random
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * [[org.apache.spark.rpc.Master]] uses Scheduler to pick a Worker to send request
- */
-private[rpc] class Scheduler {
-  // mapping of worker IP address to worker instance
-  private val workers = mutable.Map[String, Schedulable]()
-  private val random = new Random()
-
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * Pick a Worker according to the address and workload of the Worker
-   * Invoke the RPC and return Future result
-   */
-  def sendRequestAsync[T: ClassTag](
-      splitAddress: String,
-      request: Any): (Schedulable, Future[T]) = {
-    require(splitAddress != null)
-    if (workers.isEmpty) {
-      throw new IOException("No worker is available")
-    }
-    var worker = pickWorker(splitAddress)
-
-    // check whether worker exceed max workload, if exceeded, pick next worker
-    val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores)
-    var numTry = workers.size
-    do {
-      if (worker.workload.get() >= maxWorkload) {
-        LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...")
-        worker = pickNextWorker(worker)
-        numTry = numTry - 1
-      } else {
-        numTry = -1
-      }
-    } while (numTry > 0)
-    if (numTry == 0) {
-      // tried so many times and still not able to find Worker
-      throw new WorkerTooBusyException(
-        s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload")
-    }
-    LOG.info(s"sending search request to worker ${worker.address}:${worker.port}")
-    val future = worker.ref.ask(request)
-    worker.workload.incrementAndGet()
-    (worker, future)
-  }
-
-  private def pickWorker[T: ClassTag](splitAddress: String) = {
-    try {
-      workers(splitAddress)
-    } catch {
-      case e: NoSuchElementException =>
-        // no local worker available, choose one worker randomly
-        pickRandomWorker()
-    }
-  }
-
-  /** pick a worker randomly */
-  private def pickRandomWorker() = {
-    val index = random.nextInt(workers.size)
-    workers.toSeq(index)._2
-  }
-
-  /** pick the next worker of the input worker in the [[Scheduler.workers]] */
-  private def pickNextWorker(worker: Schedulable) = {
-    val index = workers.zipWithIndex.find { case ((address, w), index) =>
-      w == worker
-    }.get._2
-    if (index == workers.size - 1) {
-      workers.toSeq.head._2
-    } else {
-      workers.toSeq(index + 1)._2
-    }
-  }
-
-  /** A new searcher is trying to register, add it to the map and connect to this searcher */
-  def addWorker(address: String, schedulable: Schedulable): Unit = {
-    require(schedulable != null)
-    require(address.equals(schedulable.address))
-    workers(address) = schedulable
-  }
-
-  def removeWorker(address: String): Unit = {
-    workers.remove(address)
-  }
-
-  def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator
-}
-
-/**
- * Represent a Worker which [[Scheduler]] can send
- * Search request on it
- * @param id Worker ID, a UUID string
- * @param cores, number of cores in Worker
- * @param ref RPC endpoint reference
- * @param workload number of outstanding request sent to Worker
- */
-private[rpc] class Schedulable(
-    val id: String,
-    val address: String,
-    val port: Int,
-    val cores: Int,
-    val ref: RpcEndpointRef,
-    var workload: AtomicInteger) {
-  def this(id: String, address: String, port: Int, cores: Int, ref: RpcEndpointRef) = {
-    this(id, address, port, cores, ref, new AtomicInteger())
-  }
-}
-
-class WorkerTooBusyException(message: String) extends RuntimeException(message)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
deleted file mode 100644
index 08baeeb..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.rpc.netty.NettyRpcEnvFactory
-import org.apache.spark.search.{RegisterWorkerRequest, RegisterWorkerResponse, Searcher}
-import org.apache.spark.util.ThreadUtils
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-@InterfaceAudience.Internal
-object Worker {
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  private val hostAddress = InetAddress.getLocalHost.getHostAddress
-  private var port: Int = _
-
-  def init(masterHostAddress: String, masterPort: Int): Unit = {
-    LOG.info(s"initializing worker...")
-    startService()
-    LOG.info(s"registering to master $masterHostAddress:$masterPort")
-    val workerId = registerToMaster(masterHostAddress, masterPort)
-    LOG.info(s"worker registered to master, workerId: $workerId")
-  }
-
-  /**
-   * Start to listen on port [[CarbonProperties.getSearchWorkerPort]]
-   */
-  private def startService(): Unit = {
-    new Thread(new Runnable {
-      override def run(): Unit = {
-        port = CarbonProperties.getSearchWorkerPort
-        val conf = new SparkConf()
-        var rpcEnv: RpcEnv = null
-        var exception: BindException = null
-        var numTry = 100  // we will try to create service at worse case 100 times
-        do {
-          try {
-            LOG.info(s"starting search-service on $hostAddress:$port")
-            val config = RpcUtil.getRpcEnvConfig(
-              conf, s"worker-$hostAddress", hostAddress, "", port,
-              new SecurityManager(conf), clientMode = false)
-            rpcEnv = new NettyRpcEnvFactory().create(config)
-            numTry = 0
-          } catch {
-            case e: BindException =>
-              // port is occupied, increase the port number and try again
-              exception = e
-              LOG.error(s"start search-service failed: ${e.getMessage}")
-              port = port + 1
-              numTry = numTry - 1
-          }
-        } while (numTry > 0)
-        if (rpcEnv == null) {
-          // we have tried many times, but still failed to find an available port
-          throw exception
-        }
-        val searchEndpoint: RpcEndpoint = new Searcher(rpcEnv)
-        rpcEnv.setupEndpoint("search-service", searchEndpoint)
-        LOG.info("search-service started")
-        rpcEnv.awaitTermination()
-      }
-    }).start()
-  }
-
-  private def registerToMaster(masterHostAddress: String, masterPort: Int): String = {
-    LOG.info(s"trying to register to master $masterHostAddress:$masterPort")
-    val conf = new SparkConf()
-    val config = RpcUtil.getRpcEnvConfig(conf, "registry-client", masterHostAddress, "", masterPort,
-      new SecurityManager(conf), clientMode = true)
-    val rpcEnv: RpcEnv = new NettyRpcEnvFactory().create(config)
-
-    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(
-      RpcAddress(masterHostAddress, masterPort), "registry-service")
-    val cores = Runtime.getRuntime.availableProcessors()
-
-    val request = RegisterWorkerRequest(hostAddress, port, cores)
-    val future = endPointRef.ask[RegisterWorkerResponse](request)
-    ThreadUtils.awaitResult(future, Duration.apply("10s"))
-    future.value match {
-      case Some(result) =>
-        result match {
-          case Success(response) =>
-            LOG.info("worker registered")
-            response.workerId
-          case Failure(throwable) =>
-            LOG.error(s"worker failed to registered: $throwable")
-            throw new IOException(throwable.getMessage)
-        }
-      case None =>
-        LOG.error("worker register timeout")
-        throw new ExecutionTimeoutException
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/search/Registry.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Registry.scala b/store/search/src/main/scala/org/apache/spark/search/Registry.scala
deleted file mode 100644
index 22e766d..0000000
--- a/store/search/src/main/scala/org/apache/spark/search/Registry.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.search
-
-import org.apache.spark.rpc.{Master, RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-/**
- * Registry service implementation. It adds worker to master.
- */
-class Registry(override val rpcEnv: RpcEnv, master: Master) extends RpcEndpoint {
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  override def onStart(): Unit = {
-    LOG.info("Registry Endpoint started")
-  }
-
-  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case req@RegisterWorkerRequest(_, _, _) =>
-      val response = master.addWorker(req)
-      context.reply(response)
-  }
-
-  override def onStop(): Unit = {
-    LOG.info("Registry Endpoint stopped")
-  }
-
-}
-
-case class RegisterWorkerRequest(
-    hostAddress: String,
-    port: Int,
-    cores: Int)
-
-case class RegisterWorkerResponse(
-    workerId: String)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
deleted file mode 100644
index 6fbea15..0000000
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.search
-
-import org.apache.spark.SerializableWritable
-import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
-import org.apache.carbondata.store.worker.SearchRequestHandler
-
-/**
- * Search service implementation
- */
-class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint {
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getName)
-
-  override def onStart(): Unit = {
-    LOG.info("Searcher Endpoint started")
-  }
-
-  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case req: SearchRequest =>
-      val response = new SearchRequestHandler().handleSearch(req)
-      context.reply(response)
-
-    case req: ShutdownRequest =>
-      val response = new SearchRequestHandler().handleShutdown(req)
-      context.reply(response)
-
-  }
-
-  override def onStop(): Unit = {
-    LOG.info("Searcher Endpoint stopped")
-  }
-}
-
-// Search request sent from master to worker
-case class SearchRequest(
-    searchId: Int,
-    split: SerializableWritable[CarbonMultiBlockSplit],
-    tableInfo: TableInfo,
-    projectColumns: Array[String],
-    filterExpression: Expression,
-    limit: Long)
-
-// Search result sent from worker to master
-case class SearchResult(
-    queryId: Int,
-    status: Int,
-    message: String,
-    rows: Array[Array[Object]])
-
-// Shutdown request sent from master to worker
-case class ShutdownRequest(
-    reason: String)
-
-// Shutdown response sent from worker to master
-case class ShutdownResponse(
-    status: Int,
-    message: String)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
----------------------------------------------------------------------
diff --git a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java b/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
deleted file mode 100644
index 88d925f..0000000
--- a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-public class SearchServiceTest {
-//  @Test
-//  public void testStartStopService() throws IOException, ExecutionException, InterruptedException {
-//    Master master = new Master(9999);
-//    master.startService();
-//
-//    Worker worker = Worker.getInstance();
-//    worker.init(InetAddress.getLocalHost().getHostName(), 9999);
-//
-//    Set<String> workers = master.getWorkers();
-//    Assert.assertEquals(1, workers.size());
-//    Assert.assertEquals(InetAddress.getLocalHost().getHostName(), workers.toArray()[0]);
-//
-//    master.stopAllWorkers();
-//    master.stopService();
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
deleted file mode 100644
index 8780dc0..0000000
--- a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import scala.concurrent.Future
-import scala.reflect.ClassTag
-
-import org.apache.spark.SparkConf
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-class SchedulerSuite extends FunSuite with BeforeAndAfterEach {
-
-  var scheduler: Scheduler = _
-  var w1: Schedulable = _
-  var w2: Schedulable = _
-  var w3: Schedulable = _
-
-  override def beforeEach(): Unit = {
-    scheduler = new Scheduler()
-    w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef())
-    w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef())
-    w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef())
-
-    scheduler.addWorker("1.1.1.1", w1)
-    scheduler.addWorker("1.1.1.2", w2)
-    scheduler.addWorker("1.1.1.3", w3)
-  }
-
-  test("test addWorker, removeWorker, getAllWorkers") {
-    assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
-    scheduler.removeWorker("1.1.1.2")
-    assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
-    val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef())
-    scheduler.addWorker("1.1.1.4", w4)
-    assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet)
-    assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id)
-  }
-
-  test("test normal schedule") {
-    val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-    assertResult(w1.id)(r1.id)
-    val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-    assertResult(w2.id)(r2.id)
-    val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    assertResult(w3.id)(r3.id)
-    val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-    assertResult(w1.id)(r4.id)
-    val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-    assertResult(w2.id)(r5.id)
-    val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    assertResult(w3.id)(r6.id)
-  }
-
-  test("test worker unavailable") {
-    val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null)
-    assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id))
-  }
-
-  test("test reschedule when target worker is overload") {
-    // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
-    (1 to 40).foreach { i =>
-      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-    val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    // it must be worker1 since worker3 exceed max workload
-    assertResult(w1.id)(r.id)
-  }
-
-  test("test all workers are overload") {
-    // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
-    (1 to 40).foreach { i =>
-      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-
-    val e = intercept[WorkerTooBusyException] {
-      scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-  }
-
-  test("test user configured overload param") {
-    val original = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-
-    CarbonProperties.getInstance().addProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3")
-
-    (1 to 3).foreach { i =>
-      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-
-    val e = intercept[WorkerTooBusyException] {
-      scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-
-    if (original != null) {
-      CarbonProperties.getInstance().addProperty(
-        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original)
-    }
-  }
-
-  test("test invalid property") {
-    intercept[IllegalArgumentException] {
-      CarbonProperties.getInstance().addProperty(
-        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3")
-    }
-    var value = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-    assertResult(null)(value)
-
-    intercept[NumberFormatException] {
-      CarbonProperties.getInstance().addProperty(
-        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s")
-    }
-    value = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-    assertResult(null)(value)
-  }
-}
-
-class DummyRef extends RpcEndpointRef(new SparkConf) {
-  override def address: RpcAddress = null
-
-  override def name: String = ""
-
-  override def send(message: Any): Unit = { }
-
-  override def ask[T](message: Any, timeout: RpcTimeout)
-    (implicit evidence$1: ClassTag[T]): Future[T] = null
-}
\ No newline at end of file