You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/06/25 01:48:42 UTC

[1/3] carbondata git commit: [CARBONDATA-2609] Change RPC implementation to Hadoop RPC framework

Repository: carbondata
Updated Branches:
  refs/heads/carbonstore d5e86db52 -> fa111380f


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
deleted file mode 100644
index 26208d0..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.reflect.ClassTag
-import scala.util.Random
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * [[org.apache.spark.rpc.Master]] uses Scheduler to pick a Worker to send request
- */
-private[rpc] class Scheduler {
-  // mapping of worker IP address to worker instance
-  private val workers = mutable.Map[String, Schedulable]()
-  private val random = new Random()
-
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * Pick a Worker according to the address and workload of the Worker
-   * Invoke the RPC and return Future result
-   */
-  def sendRequestAsync[T: ClassTag](
-      splitAddress: String,
-      request: Any): (Schedulable, Future[T]) = {
-    require(splitAddress != null)
-    if (workers.isEmpty) {
-      throw new IOException("No worker is available")
-    }
-    var worker = pickWorker(splitAddress)
-
-    // check whether worker exceed max workload, if exceeded, pick next worker
-    val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores)
-    var numTry = workers.size
-    do {
-      if (worker.workload.get() >= maxWorkload) {
-        LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...")
-        worker = pickNextWorker(worker)
-        numTry = numTry - 1
-      } else {
-        numTry = -1
-      }
-    } while (numTry > 0)
-    if (numTry == 0) {
-      // tried so many times and still not able to find Worker
-      throw new WorkerTooBusyException(
-        s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload")
-    }
-    LOG.info(s"sending search request to worker ${worker.address}:${worker.port}")
-    val future = worker.ref.ask(request)
-    worker.workload.incrementAndGet()
-    (worker, future)
-  }
-
-  private def pickWorker[T: ClassTag](splitAddress: String) = {
-    try {
-      workers(splitAddress)
-    } catch {
-      case e: NoSuchElementException =>
-        // no local worker available, choose one worker randomly
-        pickRandomWorker()
-    }
-  }
-
-  /** pick a worker randomly */
-  private def pickRandomWorker() = {
-    val index = random.nextInt(workers.size)
-    workers.toSeq(index)._2
-  }
-
-  /** pick the next worker of the input worker in the [[Scheduler.workers]] */
-  private def pickNextWorker(worker: Schedulable) = {
-    val index = workers.zipWithIndex.find { case ((address, w), index) =>
-      w == worker
-    }.get._2
-    if (index == workers.size - 1) {
-      workers.toSeq.head._2
-    } else {
-      workers.toSeq(index + 1)._2
-    }
-  }
-
-  /** A new searcher is trying to register, add it to the map and connect to this searcher */
-  def addWorker(address: String, schedulable: Schedulable): Unit = {
-    require(schedulable != null)
-    require(address.equals(schedulable.address))
-    workers(address) = schedulable
-  }
-
-  def removeWorker(address: String): Unit = {
-    workers.remove(address)
-  }
-
-  def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator
-}
-
-/**
- * Represent a Worker which [[Scheduler]] can send
- * Search request on it
- * @param id Worker ID, a UUID string
- * @param cores, number of cores in Worker
- * @param ref RPC endpoint reference
- * @param workload number of outstanding request sent to Worker
- */
-private[rpc] class Schedulable(
-    val id: String,
-    val address: String,
-    val port: Int,
-    val cores: Int,
-    val ref: RpcEndpointRef,
-    var workload: AtomicInteger) {
-  def this(id: String, address: String, port: Int, cores: Int, ref: RpcEndpointRef) = {
-    this(id, address, port, cores, ref, new AtomicInteger())
-  }
-}
-
-class WorkerTooBusyException(message: String) extends RuntimeException(message)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
deleted file mode 100644
index 0f2138a..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.rpc.netty.NettyRpcEnvFactory
-import org.apache.spark.search.{RegisterWorkerRequest, RegisterWorkerResponse, Searcher}
-import org.apache.spark.util.ThreadUtils
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-@InterfaceAudience.Internal
-object Worker {
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  private val hostAddress = InetAddress.getLocalHost.getHostAddress
-  private var port: Int = _
-
-  def init(masterHostAddress: String, masterPort: Int): Unit = {
-    LOG.info(s"initializing worker...")
-    startService()
-    LOG.info(s"registering to master $masterHostAddress:$masterPort")
-    val workerId = registerToMaster(masterHostAddress, masterPort)
-    LOG.info(s"worker registered to master, workerId: $workerId")
-  }
-
-  /**
-   * Start to listen on port [[CarbonProperties.getSearchWorkerPort]]
-   */
-  private def startService(): Unit = {
-    new Thread(new Runnable {
-      override def run(): Unit = {
-        port = CarbonProperties.getSearchWorkerPort
-        val conf = new SparkConf()
-        var rpcEnv: RpcEnv = null
-        var exception: BindException = null
-        var numTry = 100  // we will try to create service at worse case 100 times
-        do {
-          try {
-            LOG.info(s"starting search-service on $hostAddress:$port")
-            val config = RpcEnvConfig(
-              conf, s"worker-$hostAddress", hostAddress, "", port,
-              new SecurityManager(conf), clientMode = false)
-            rpcEnv = new NettyRpcEnvFactory().create(config)
-            numTry = 0
-          } catch {
-            case e: BindException =>
-              // port is occupied, increase the port number and try again
-              exception = e
-              LOG.error(s"start search-service failed: ${e.getMessage}")
-              port = port + 1
-              numTry = numTry - 1
-          }
-        } while (numTry > 0)
-        if (rpcEnv == null) {
-          // we have tried many times, but still failed to find an available port
-          throw exception
-        }
-        val searchEndpoint: RpcEndpoint = new Searcher(rpcEnv)
-        rpcEnv.setupEndpoint("search-service", searchEndpoint)
-        LOG.info("search-service started")
-        rpcEnv.awaitTermination()
-      }
-    }).start()
-  }
-
-  private def registerToMaster(masterHostAddress: String, masterPort: Int): String = {
-    LOG.info(s"trying to register to master $masterHostAddress:$masterPort")
-    val conf = new SparkConf()
-    val config = RpcEnvConfig(conf, "registry-client", masterHostAddress, "", masterPort,
-      new SecurityManager(conf), clientMode = true)
-    val rpcEnv: RpcEnv = new NettyRpcEnvFactory().create(config)
-
-    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(
-      RpcAddress(masterHostAddress, masterPort), "registry-service")
-    val cores = Runtime.getRuntime.availableProcessors()
-
-    val request = RegisterWorkerRequest(hostAddress, port, cores)
-    val future = endPointRef.ask[RegisterWorkerResponse](request)
-    ThreadUtils.awaitResult(future, Duration.apply("10s"))
-    future.value match {
-      case Some(result) =>
-        result match {
-          case Success(response) =>
-            LOG.info("worker registered")
-            response.workerId
-          case Failure(throwable) =>
-            LOG.error(s"worker failed to registered: $throwable")
-            throw new IOException(throwable.getMessage)
-        }
-      case None =>
-        LOG.error("worker register timeout")
-        throw new ExecutionTimeoutException
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/search/Registry.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Registry.scala b/store/search/src/main/scala/org/apache/spark/search/Registry.scala
deleted file mode 100644
index 22e766d..0000000
--- a/store/search/src/main/scala/org/apache/spark/search/Registry.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.search
-
-import org.apache.spark.rpc.{Master, RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-/**
- * Registry service implementation. It adds worker to master.
- */
-class Registry(override val rpcEnv: RpcEnv, master: Master) extends RpcEndpoint {
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  override def onStart(): Unit = {
-    LOG.info("Registry Endpoint started")
-  }
-
-  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case req@RegisterWorkerRequest(_, _, _) =>
-      val response = master.addWorker(req)
-      context.reply(response)
-  }
-
-  override def onStop(): Unit = {
-    LOG.info("Registry Endpoint stopped")
-  }
-
-}
-
-case class RegisterWorkerRequest(
-    hostAddress: String,
-    port: Int,
-    cores: Int)
-
-case class RegisterWorkerResponse(
-    workerId: String)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
deleted file mode 100644
index 6fbea15..0000000
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.search
-
-import org.apache.spark.SerializableWritable
-import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
-import org.apache.carbondata.store.worker.SearchRequestHandler
-
-/**
- * Search service implementation
- */
-class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint {
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getName)
-
-  override def onStart(): Unit = {
-    LOG.info("Searcher Endpoint started")
-  }
-
-  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case req: SearchRequest =>
-      val response = new SearchRequestHandler().handleSearch(req)
-      context.reply(response)
-
-    case req: ShutdownRequest =>
-      val response = new SearchRequestHandler().handleShutdown(req)
-      context.reply(response)
-
-  }
-
-  override def onStop(): Unit = {
-    LOG.info("Searcher Endpoint stopped")
-  }
-}
-
-// Search request sent from master to worker
-case class SearchRequest(
-    searchId: Int,
-    split: SerializableWritable[CarbonMultiBlockSplit],
-    tableInfo: TableInfo,
-    projectColumns: Array[String],
-    filterExpression: Expression,
-    limit: Long)
-
-// Search result sent from worker to master
-case class SearchResult(
-    queryId: Int,
-    status: Int,
-    message: String,
-    rows: Array[Array[Object]])
-
-// Shutdown request sent from master to worker
-case class ShutdownRequest(
-    reason: String)
-
-// Shutdown response sent from worker to master
-case class ShutdownResponse(
-    status: Int,
-    message: String)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
----------------------------------------------------------------------
diff --git a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java b/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
deleted file mode 100644
index 88d925f..0000000
--- a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-public class SearchServiceTest {
-//  @Test
-//  public void testStartStopService() throws IOException, ExecutionException, InterruptedException {
-//    Master master = new Master(9999);
-//    master.startService();
-//
-//    Worker worker = Worker.getInstance();
-//    worker.init(InetAddress.getLocalHost().getHostName(), 9999);
-//
-//    Set<String> workers = master.getWorkers();
-//    Assert.assertEquals(1, workers.size());
-//    Assert.assertEquals(InetAddress.getLocalHost().getHostName(), workers.toArray()[0]);
-//
-//    master.stopAllWorkers();
-//    master.stopService();
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
deleted file mode 100644
index 8780dc0..0000000
--- a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import scala.concurrent.Future
-import scala.reflect.ClassTag
-
-import org.apache.spark.SparkConf
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-class SchedulerSuite extends FunSuite with BeforeAndAfterEach {
-
-  var scheduler: Scheduler = _
-  var w1: Schedulable = _
-  var w2: Schedulable = _
-  var w3: Schedulable = _
-
-  override def beforeEach(): Unit = {
-    scheduler = new Scheduler()
-    w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef())
-    w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef())
-    w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef())
-
-    scheduler.addWorker("1.1.1.1", w1)
-    scheduler.addWorker("1.1.1.2", w2)
-    scheduler.addWorker("1.1.1.3", w3)
-  }
-
-  test("test addWorker, removeWorker, getAllWorkers") {
-    assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
-    scheduler.removeWorker("1.1.1.2")
-    assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
-    val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef())
-    scheduler.addWorker("1.1.1.4", w4)
-    assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet)
-    assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id)
-  }
-
-  test("test normal schedule") {
-    val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-    assertResult(w1.id)(r1.id)
-    val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-    assertResult(w2.id)(r2.id)
-    val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    assertResult(w3.id)(r3.id)
-    val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-    assertResult(w1.id)(r4.id)
-    val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-    assertResult(w2.id)(r5.id)
-    val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    assertResult(w3.id)(r6.id)
-  }
-
-  test("test worker unavailable") {
-    val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null)
-    assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id))
-  }
-
-  test("test reschedule when target worker is overload") {
-    // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
-    (1 to 40).foreach { i =>
-      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-    val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    // it must be worker1 since worker3 exceed max workload
-    assertResult(w1.id)(r.id)
-  }
-
-  test("test all workers are overload") {
-    // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
-    (1 to 40).foreach { i =>
-      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-
-    val e = intercept[WorkerTooBusyException] {
-      scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-  }
-
-  test("test user configured overload param") {
-    val original = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-
-    CarbonProperties.getInstance().addProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3")
-
-    (1 to 3).foreach { i =>
-      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-
-    val e = intercept[WorkerTooBusyException] {
-      scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-
-    if (original != null) {
-      CarbonProperties.getInstance().addProperty(
-        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original)
-    }
-  }
-
-  test("test invalid property") {
-    intercept[IllegalArgumentException] {
-      CarbonProperties.getInstance().addProperty(
-        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3")
-    }
-    var value = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-    assertResult(null)(value)
-
-    intercept[NumberFormatException] {
-      CarbonProperties.getInstance().addProperty(
-        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s")
-    }
-    value = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-    assertResult(null)(value)
-  }
-}
-
-class DummyRef extends RpcEndpointRef(new SparkConf) {
-  override def address: RpcAddress = null
-
-  override def name: String = ""
-
-  override def send(message: Any): Unit = { }
-
-  override def ask[T](message: Any, timeout: RpcTimeout)
-    (implicit evidence$1: ClassTag[T]): Future[T] = null
-}
\ No newline at end of file


[3/3] carbondata git commit: [CARBONDATA-2609] Change RPC implementation to Hadoop RPC framework

Posted by qi...@apache.org.
[CARBONDATA-2609] Change RPC implementation to Hadoop RPC framework

This closes #2372


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fa111380
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fa111380
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fa111380

Branch: refs/heads/carbonstore
Commit: fa111380fc46c955330bc47b802844e576b6524f
Parents: d5e86db
Author: Jacky Li <ja...@qq.com>
Authored: Wed Jun 13 23:57:00 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Mon Jun 25 09:46:17 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +-
 .../carbondata/core/scan/model/QueryModel.java  |  14 +-
 .../carbondata/core/util/CarbonProperties.java  |  10 +
 .../core/util/ObjectSerializationUtil.java      |  14 +
 .../carbondata/hadoop/CarbonRecordReader.java   |   8 +-
 .../detailquery/SearchModeTestCase.scala        |  17 +-
 integration/spark2/pom.xml                      |   2 +-
 .../carbondata/store/SparkCarbonStore.scala     |  27 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   1 +
 pom.xml                                         |   2 +-
 store/core/pom.xml                              | 113 +++++++
 .../carbondata/store/CarbonRowReadSupport.java  |  53 ++++
 .../apache/carbondata/store/CarbonStore.java    |  68 +++++
 .../carbondata/store/LocalCarbonStore.java      | 130 +++++++++
 .../carbondata/store/MetaCachedCarbonStore.java |  59 ++++
 .../carbondata/store/rpc/QueryService.java      |  33 +++
 .../carbondata/store/rpc/RegistryService.java   |  30 ++
 .../carbondata/store/rpc/ServiceFactory.java    |  43 +++
 .../store/rpc/impl/IndexedRecordReader.java     | 161 ++++++++++
 .../store/rpc/impl/QueryServiceImpl.java        |  56 ++++
 .../store/rpc/impl/RegistryServiceImpl.java     |  54 ++++
 .../store/rpc/impl/RequestHandler.java          | 147 ++++++++++
 .../carbondata/store/rpc/impl/Status.java       |  28 ++
 .../store/rpc/model/QueryRequest.java           | 108 +++++++
 .../store/rpc/model/QueryResponse.java          |  84 ++++++
 .../store/rpc/model/RegisterWorkerRequest.java  |  69 +++++
 .../store/rpc/model/RegisterWorkerResponse.java |  54 ++++
 .../store/rpc/model/ShutdownRequest.java        |  53 ++++
 .../store/rpc/model/ShutdownResponse.java       |  61 ++++
 .../org/apache/carbondata/store/Master.scala    | 283 ++++++++++++++++++
 .../org/apache/carbondata/store/Scheduler.scala | 147 ++++++++++
 .../org/apache/carbondata/store/Worker.scala    | 113 +++++++
 .../carbondata/store/LocalCarbonStoreTest.java  |  72 +++++
 .../org/apache/carbondata/store/TestUtil.java   | 168 +++++++++++
 .../carbondata/store/SchedulerSuite.scala       | 155 ++++++++++
 .../carbondata/store/CarbonRowReadSupport.java  |  53 ----
 .../apache/carbondata/store/CarbonStore.java    |  68 -----
 .../carbondata/store/LocalCarbonStore.java      | 130 ---------
 .../carbondata/store/MetaCachedCarbonStore.java |  59 ----
 .../carbondata/store/LocalCarbonStoreTest.java  |  72 -----
 store/search/pom.xml                            | 112 -------
 .../store/worker/SearchRequestHandler.java      | 247 ----------------
 .../apache/carbondata/store/worker/Status.java  |  28 --
 .../scala/org/apache/spark/rpc/Master.scala     | 291 -------------------
 .../scala/org/apache/spark/rpc/Scheduler.scala  | 139 ---------
 .../scala/org/apache/spark/rpc/Worker.scala     | 118 --------
 .../org/apache/spark/search/Registry.scala      |  51 ----
 .../org/apache/spark/search/Searcher.scala      |  79 -----
 .../carbondata/store/SearchServiceTest.java     |  37 ---
 .../org/apache/spark/rpc/SchedulerSuite.scala   | 154 ----------
 50 files changed, 2402 insertions(+), 1677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 118ff28..ff6b358 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1725,7 +1725,7 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";
 
   /**
-   * It's timeout threshold of carbon search query
+   * It's timeout threshold of carbon search query, in seconds
    */
   @CarbonProperty
   @InterfaceStability.Unstable
@@ -1734,7 +1734,7 @@ public final class CarbonCommonConstants {
   /**
    * Default value is 10 seconds
    */
-  public static final String CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = "10s";
+  public static final int CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = 10;
 
   /**
    * The size of thread pool used for reading files in Work for search mode. By default,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 55dafb9..b15ce02 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -69,6 +69,7 @@ public class QueryModel {
    * table block information in which query will be executed
    */
   private List<TableBlockInfo> tableBlockInfos;
+
   /**
    * To handle most of the computation in query engines like spark and hive, carbon should give
    * raw detailed records to it.
@@ -109,11 +110,6 @@ public class QueryModel {
    */
   private boolean requiredRowId;
 
-  /**
-   * whether it is FG with search mode
-   */
-  private boolean isFG;
-
   private QueryModel(CarbonTable carbonTable) {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
     invalidSegmentIds = new ArrayList<>();
@@ -375,14 +371,6 @@ public class QueryModel {
     this.requiredRowId = requiredRowId;
   }
 
-  public boolean isFG() {
-    return isFG;
-  }
-
-  public void setFG(boolean FG) {
-    isFG = FG;
-  }
-
   @Override
   public String toString() {
     return String.format("scan on table %s.%s, %d projection columns with filter (%s)",

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index dc50ab0..574d175 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1599,4 +1599,14 @@ public final class CarbonProperties {
     }
     return storageLevel.toUpperCase();
   }
+
+  public int getQueryTimeout() {
+    try {
+      return Integer.parseInt(
+          CarbonProperties.getInstance().getProperty(
+              CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT));
+    } catch (NumberFormatException e) {
+      return CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
index 020787d..e133208 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
@@ -113,4 +113,18 @@ public class ObjectSerializationUtil {
     }
   }
 
+  public static byte[] serialize(Object object) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+    ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(object);
+    return baos.toByteArray();
+  }
+
+  public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+    if (bytes == null) {
+      return null;
+    }
+    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+    return ois.readObject();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 6b56382..3a0037f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -78,12 +78,8 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
     } else {
       throw new RuntimeException("unsupported input split type: " + inputSplit);
     }
-    // It should use the exists tableBlockInfos if tableBlockInfos of queryModel is not empty
-    // otherwise the prune is no use before this method
-    if (!queryModel.isFG()) {
-      List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
-      queryModel.setTableBlockInfos(tableBlockInfoList);
-    }
+    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+    queryModel.setTableBlockInfos(tableBlockInfoList);
     readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
     try {
       carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 001f6c0..af9e50f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -60,6 +60,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("SearchMode Query: row result") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
     checkSearchAnswer("select * from main where city = 'city3'")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
@@ -67,36 +68,44 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("SearchMode Query: vector result") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     checkSearchAnswer("select * from main where city = 'city3'")
   }
 
   test("equal filter") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     checkSearchAnswer("select id from main where id = '100'")
     checkSearchAnswer("select id from main where planet = 'planet100'")
   }
 
   test("greater and less than filter") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     checkSearchAnswer("select id from main where m2 < 4")
   }
 
   test("IN filter") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     checkSearchAnswer("select id from main where id IN ('40', '50', '60')")
   }
 
   test("expression filter") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     checkSearchAnswer("select id from main where length(id) < 2")
   }
 
   test("filter with limit") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     checkSearchAnswer("select id from main where id = '3' limit 10")
     checkSearchAnswer("select id from main where length(id) < 2 limit 10")
   }
 
   test("aggregate query") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     checkSearchAnswer("select city, sum(m1) from main where m2 < 10 group by city")
   }
 
   test("aggregate query with datamap and fallback to SparkSQL") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     sql("create datamap preagg on table main using 'preaggregate' as select city, count(*) from main group by city ")
     checkSearchAnswer("select city, count(*) from main group by city")
     sql("drop datamap preagg on table main").show()
@@ -108,10 +117,11 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
     checkSearchAnswer("select id from main where id = '3' limit 10")
     sql("set carbon.search.enabled = false")
     assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
+    sql("set carbon.search.enabled = true")
   }
 
   test("test lucene datamap with search mode") {
-    sql("set carbon.search.enabled = true")
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     sql("DROP DATAMAP IF EXISTS dm ON TABLE main")
     sql("CREATE DATAMAP dm ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='id') ")
     checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"),
@@ -120,6 +130,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test lucene datamap with search mode 2") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     sql("drop datamap if exists dm3 ON TABLE main")
     sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city') ")
     checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"),
@@ -128,6 +139,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test lucene datamap with search mode, two column") {
+    assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     sql("drop datamap if exists dm3 ON TABLE main")
     sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city , id') ")
     checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"),
@@ -137,7 +149,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
     sql("DROP DATAMAP if exists dm3 ON TABLE main")
   }
 
-  test("start search mode twice") {
+  ignore("start search mode twice") {
     sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
     assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     checkSearchAnswer("select id from main where id = '3' limit 10")
@@ -148,6 +160,5 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
     sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
     assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
     checkSearchAnswer("select id from main where id = '3' limit 10")
-    sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 9b9e71d..6b05800 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -41,7 +41,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-store-sdk</artifactId>
+      <artifactId>carbondata-store-core</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index 3a6adea..d99081d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -23,7 +23,6 @@ import java.net.InetAddress
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{CarbonInputMetrics, SparkConf}
-import org.apache.spark.rpc.{Master, Worker}
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.SparkSession
 
@@ -111,24 +110,26 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
 
   def startSearchMode(): Unit = {
     LOG.info("Starting search mode master")
-    master = new Master(session.sparkContext.getConf)
+    master = new Master()
     master.startService()
     startAllWorkers()
   }
 
   def stopSearchMode(): Unit = {
-    LOG.info("Shutting down all workers...")
-    try {
-      master.stopAllWorkers()
-      LOG.info("All workers are shutted down")
-    } catch {
-      case e: Exception =>
-        LOG.error(s"failed to shutdown worker: ${e.toString}")
+    if (master != null) {
+      LOG.info("Shutting down all workers...")
+      try {
+        master.stopAllWorkers()
+        LOG.info("All workers are shut down")
+      } catch {
+        case e: Exception =>
+          LOG.error(s"failed to shutdown worker: ${ e.toString }")
+      }
+      LOG.info("Stopping master...")
+      master.stopService()
+      LOG.info("Master stopped")
+      master = null
     }
-    LOG.info("Stopping master...")
-    master.stopService()
-    LOG.info("Master stopped")
-    master = null
   }
 
   /** search mode */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 30cb464..7fdba89 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -214,6 +214,7 @@ class CarbonSession(@transient val sc: SparkContext,
         case e: RuntimeException =>
           LogServiceFactory.getLogService(this.getClass.getCanonicalName)
             .error(s"Stop search mode failed: ${e.getMessage}")
+          throw e
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1413fd1..5e81f2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
     <module>integration/spark-common-test</module>
     <module>datamap/examples</module>
     <module>store/sdk</module>
-    <module>store/search</module>
+    <module>store/core</module>
     <module>assembly</module>
   </modules>
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/pom.xml
----------------------------------------------------------------------
diff --git a/store/core/pom.xml b/store/core/pom.xml
new file mode 100644
index 0000000..0bee84f
--- /dev/null
+++ b/store/core/pom.xml
@@ -0,0 +1,113 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-store-core</artifactId>
+  <name>Apache CarbonData :: Store Core </name>
+
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-hadoop</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-sdk</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <!-- Note config is repeated in surefire config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <filereports>CarbonTestSuite.txt</filereports>
+          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+          </argLine>
+          <stderr />
+          <environmentVariables>
+          </environmentVariables>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+          </systemProperties>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java b/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
new file mode 100644
index 0000000..bafbb9f
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+
+/**
+ * ReadSupport that convert row object to CarbonRow
+ */
+@InterfaceAudience.Internal
+public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> {
+  private CarbonReadSupport<Object[]> delegate;
+
+  public CarbonRowReadSupport() {
+    this.delegate = new DictionaryDecodeReadSupport<>();
+  }
+
+  @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable)
+      throws IOException {
+    delegate.initialize(carbonColumns, carbonTable);
+  }
+
+  @Override public CarbonRow readRow(Object[] data) {
+    Object[] converted = delegate.readRow(data);
+    return new CarbonRow(converted);
+  }
+
+  @Override public void close() {
+    delegate.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java
new file mode 100644
index 0000000..c6b2fb8
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.scan.expression.Expression;
+
+/**
+ * User can use {@link CarbonStore} to query data
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface CarbonStore extends Closeable {
+
+  /**
+   * Scan query on the data in the table path
+   * @param path table path
+   * @param projectColumns column names to read
+   * @return rows
+   * @throws IOException if unable to read files in table path
+   */
+  Iterator<CarbonRow> scan(
+      String path,
+      String[] projectColumns) throws IOException;
+
+  /**
+   * Scan query with filter, on the data in the table path
+   * @param path table path
+   * @param projectColumns column names to read
+   * @param filter filter condition, can be null
+   * @return rows that satisfy filter condition
+   * @throws IOException if unable to read files in table path
+   */
+  Iterator<CarbonRow> scan(
+      String path,
+      String[] projectColumns,
+      Expression filter) throws IOException;
+
+  /**
+   * SQL query, table should be created before calling this function
+   * @param sqlString SQL statement
+   * @return rows
+   * @throws IOException if unable to read files in table path
+   */
+  Iterator<CarbonRow> sql(String sqlString) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
new file mode 100644
index 0000000..daa1447
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * A CarbonStore implementation that works locally, without other compute framework dependency.
+ * It can be used to read data in local disk.
+ *
+ * Note that this class is experimental, it is not intended to be used in production.
+ */
+@InterfaceAudience.Internal
+class LocalCarbonStore extends MetaCachedCarbonStore {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LocalCarbonStore.class.getName());
+
+  @Override
+  public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws IOException {
+    return scan(path, projectColumns, null);
+  }
+
+  @Override public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter)
+      throws IOException {
+    Objects.requireNonNull(path);
+    Objects.requireNonNull(projectColumns);
+
+    CarbonTable table = getTable(path);
+    if (table.isStreamingSink() || table.isHivePartitionTable()) {
+      throw new UnsupportedOperationException("streaming and partition table is not supported");
+    }
+    // TODO: use InputFormat to prune data and read data
+
+    final CarbonTableInputFormat format = new CarbonTableInputFormat();
+    final Job job = new Job(new Configuration());
+    CarbonInputFormat.setTableInfo(job.getConfiguration(), table.getTableInfo());
+    CarbonInputFormat.setTablePath(job.getConfiguration(), table.getTablePath());
+    CarbonInputFormat.setTableName(job.getConfiguration(), table.getTableName());
+    CarbonInputFormat.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
+    CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), CarbonRowReadSupport.class);
+    CarbonInputFormat
+        .setColumnProjection(job.getConfiguration(), new CarbonProjection(projectColumns));
+    if (filter != null) {
+      CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
+    }
+
+    final List<InputSplit> splits =
+        format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
+
+    List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size());
+
+    List<CarbonRow> rows = new ArrayList<>();
+
+    try {
+      for (InputSplit split : splits) {
+        TaskAttemptContextImpl attempt =
+            new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+        RecordReader reader = format.createRecordReader(split, attempt);
+        reader.initialize(split, attempt);
+        readers.add(reader);
+      }
+
+      for (RecordReader<Void, Object> reader : readers) {
+        while (reader.nextKeyValue()) {
+          rows.add((CarbonRow) reader.getCurrentValue());
+        }
+        try {
+          reader.close();
+        } catch (IOException e) {
+          LOGGER.error(e);
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      for (RecordReader<Void, Object> reader : readers) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          LOGGER.error(e);
+        }
+      }
+    }
+    return rows.iterator();
+  }
+
+  @Override
+  public Iterator<CarbonRow> sql(String sqlString) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
new file mode 100644
index 0000000..e43f750
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * A CarbonStore base class that caches CarbonTable object
+ */
+@InterfaceAudience.Internal
+abstract class MetaCachedCarbonStore implements CarbonStore {
+
+  // mapping of table path to CarbonTable object
+  private Map<String, CarbonTable> cache = new HashMap<>();
+
+  CarbonTable getTable(String path) throws IOException {
+    if (cache.containsKey(path)) {
+      return cache.get(path);
+    }
+    org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
+        .readSchemaFile(CarbonTablePath.getSchemaFilePath(path));
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
+    tableInfo1.setTablePath(path);
+    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1);
+    cache.put(path, table);
+    return table;
+  }
+
+  @Override
+  public void close() throws IOException {
+    cache.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java
new file mode 100644
index 0000000..faaa746
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.rpc.model.ShutdownResponse;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+@InterfaceAudience.Internal
+public interface QueryService extends VersionedProtocol {
+  long versionID = 1L;
+  QueryResponse query(QueryRequest request);
+  ShutdownResponse shutdown(ShutdownRequest request);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
new file mode 100644
index 0000000..4d17686
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+@InterfaceAudience.Internal
+public interface RegistryService extends VersionedProtocol {
+  long versionID = 1L;
+  RegisterWorkerResponse registerWorker(RegisterWorkerRequest request);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
new file mode 100644
index 0000000..a50ab8b
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+
+@InterfaceAudience.Internal
+public class ServiceFactory {
+
+  public static QueryService createSearchService(String host, int port) throws IOException {
+    InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port);
+    return RPC.getProxy(
+        QueryService.class, QueryService.versionID, address, new Configuration());
+  }
+
+  public static RegistryService createRegistryService(String host, int port) throws IOException {
+    InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port);
+    return RPC.getProxy(
+        RegistryService.class, RegistryService.versionID, address, new Configuration());
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
new file mode 100644
index 0000000..2c768d1
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This is a special RecordReader that leverages FGDataMap before reading carbondata file
+ * and return CarbonRow object
+ */
+public class IndexedRecordReader extends CarbonRecordReader<CarbonRow> {
+
+  private static final LogService LOG =
+      LogServiceFactory.getLogService(RequestHandler.class.getName());
+
+  private int queryId;
+  private CarbonTable table;
+
+  public IndexedRecordReader(int queryId, CarbonTable table, QueryModel queryModel) {
+    super(queryModel, new CarbonRowReadSupport());
+    this.queryId = queryId;
+    this.table = table;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    CarbonMultiBlockSplit mbSplit = (CarbonMultiBlockSplit) inputSplit;
+    List<CarbonInputSplit> splits =  mbSplit.getAllSplits();
+    List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
+    queryModel.setTableBlockInfos(list);
+
+    // prune the block with FGDataMap is there is one based on the filter condition
+    DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
+        queryModel.getFilterExpressionResolverTree());
+    if (fgDataMap != null) {
+      queryModel = prune(table, queryModel, mbSplit, fgDataMap);
+    } else {
+      List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splits);
+      queryModel.setTableBlockInfos(tableBlockInfoList);
+    }
+
+    readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
+    try {
+      carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
+    } catch (QueryExecutionException e) {
+      throw new InterruptedException(e.getMessage());
+    }
+  }
+
+  private DataMapExprWrapper chooseFGDataMap(
+      CarbonTable table,
+      FilterResolverIntf filterInterface) {
+    DataMapChooser chooser = null;
+    try {
+      chooser = new DataMapChooser(table);
+      return chooser.chooseFGDataMap(filterInterface);
+    } catch (IOException e) {
+      LOG.error(e);
+      return null;
+    }
+  }
+
+  /**
+   * If there is FGDataMap defined for this table and filter condition in the query,
+   * prune the splits by the DataMap and set the pruned split into the QueryModel and return
+   */
+  private QueryModel prune(CarbonTable table, QueryModel queryModel,
+      CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException {
+    Objects.requireNonNull(datamap);
+    List<Segment> segments = new LinkedList<>();
+    HashMap<String, Integer> uniqueSegments = new HashMap<>();
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(
+            CarbonTablePath.getMetadataPath(table.getTablePath()));
+    for (CarbonInputSplit split : mbSplit.getAllSplits()) {
+      String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString();
+      if (uniqueSegments.get(segmentId) == null) {
+        segments.add(Segment.toSegment(segmentId,
+            new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
+                loadMetadataDetails)));
+        uniqueSegments.put(segmentId, 1);
+      } else {
+        uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
+      }
+    }
+
+    List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments);
+    List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>();
+    for (int i = 0; i < distributables.size(); i++) {
+      DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable();
+      prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
+    }
+
+    HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
+    for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
+      pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet);
+    }
+
+    List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
+    List<TableBlockInfo> blockToRead = new LinkedList<>();
+    for (TableBlockInfo block : blocks) {
+      if (pathToRead.keySet().contains(block.getFilePath())) {
+        // If not set this, it won't create FineGrainBlocklet object in
+        // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
+        block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
+        blockToRead.add(block);
+      }
+    }
+    LOG.info(String.format("[QueryId:%d] pruned using FG DataMap, pruned blocks: %d", queryId,
+        blockToRead.size()));
+    queryModel.setTableBlockInfos(blockToRead);
+    return queryModel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
new file mode 100644
index 0000000..b191331
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.rpc.QueryService;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.rpc.model.ShutdownResponse;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+
+@InterfaceAudience.Internal
+public class QueryServiceImpl implements QueryService {
+
+  @Override
+  public QueryResponse query(QueryRequest request) {
+    RequestHandler handler = new RequestHandler();
+    return handler.handleSearch(request);
+  }
+
+  @Override
+  public ShutdownResponse shutdown(ShutdownRequest request) {
+    RequestHandler handler = new RequestHandler();
+    return handler.handleShutdown(request);
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+    return versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+      int clientMethodsHash) throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
new file mode 100644
index 0000000..12f48ba
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.Master;
+import org.apache.carbondata.store.rpc.RegistryService;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+
+@InterfaceAudience.Internal
+public class RegistryServiceImpl implements RegistryService {
+
+  private Master master;
+
+  public RegistryServiceImpl(Master master) {
+    this.master = master;
+  }
+
+  @Override
+  public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) {
+    return master.addWorker(request);
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+    return versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+      int clientMethodsHash) throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
new file mode 100644
index 0000000..29ee546
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.core.util.CarbonTaskInfo;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.rpc.model.ShutdownResponse;
+
+/**
+ * It handles request from master.
+ */
+@InterfaceAudience.Internal
+class RequestHandler {
+
+  private static final LogService LOG =
+      LogServiceFactory.getLogService(RequestHandler.class.getName());
+
+  QueryResponse handleSearch(QueryRequest request) {
+    try {
+      LOG.info(String.format("[QueryId:%d] receive search request", request.getRequestId()));
+      List<CarbonRow> rows = handleRequest(request);
+      LOG.info(String.format("[QueryId:%d] sending success response", request.getRequestId()));
+      return createSuccessResponse(request, rows);
+    } catch (IOException e) {
+      LOG.error(e);
+      LOG.info(String.format("[QueryId:%d] sending failure response", request.getRequestId()));
+      return createFailureResponse(request, e);
+    }
+  }
+
+  ShutdownResponse handleShutdown(ShutdownRequest request) {
+    LOG.info("Shutting down worker...");
+    SearchModeDetailQueryExecutor.shutdownThreadPool();
+    SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
+    LOG.info("Worker shut down");
+    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
+  }
+
+  /**
+   * Builds {@link QueryModel} and read data from files
+   */
+  private List<CarbonRow> handleRequest(QueryRequest request) throws IOException {
+    CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
+    carbonTaskInfo.setTaskId(System.nanoTime());
+    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
+    CarbonMultiBlockSplit mbSplit = request.getSplit();
+    long limit = request.getLimit();
+    TableInfo tableInfo = request.getTableInfo();
+    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
+    QueryModel queryModel = createQueryModel(table, request);
+
+    LOG.info(String.format("[QueryId:%d] %s, number of block: %d",
+        request.getRequestId(), queryModel.toString(), mbSplit.getAllSplits().size()));
+
+    // read all rows by the reader
+    List<CarbonRow> rows = new LinkedList<>();
+    try (CarbonRecordReader<CarbonRow> reader =
+        new IndexedRecordReader(request.getRequestId(), table, queryModel)) {
+      reader.initialize(mbSplit, null);
+
+      // loop to read required number of rows.
+      // By default, if user does not specify the limit value, limit is Long.MaxValue
+      long rowCount = 0;
+      while (reader.nextKeyValue() && rowCount < limit) {
+        rows.add(reader.getCurrentValue());
+        rowCount++;
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+    LOG.info(String.format("[QueryId:%d] scan completed, return %d rows",
+        request.getRequestId(), rows.size()));
+    return rows;
+  }
+
+
+
+  private QueryModel createQueryModel(CarbonTable table, QueryRequest request) {
+    String[] projectColumns = request.getProjectColumns();
+    Expression filter = null;
+    if (request.getFilterExpression() != null) {
+      filter = request.getFilterExpression();
+    }
+    return new QueryModelBuilder(table)
+        .projectColumns(projectColumns)
+        .filterExpression(filter)
+        .build();
+  }
+
+  /**
+   * create a failure response
+   */
+  private QueryResponse createFailureResponse(QueryRequest request, Throwable throwable) {
+    return new QueryResponse(request.getRequestId(), Status.FAILURE.ordinal(),
+        throwable.getMessage(), new Object[0][]);
+  }
+
+  /**
+   * create a success response with result rows
+   */
+  private QueryResponse createSuccessResponse(QueryRequest request, List<CarbonRow> rows) {
+    Iterator<CarbonRow> itor = rows.iterator();
+    Object[][] output = new Object[rows.size()][];
+    int i = 0;
+    while (itor.hasNext()) {
+      output[i++] = itor.next().getData();
+    }
+    return new QueryResponse(request.getRequestId(), Status.SUCCESS.ordinal(), "", output);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java
new file mode 100644
index 0000000..9bcd397
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Status of RPC response
+ */
+@InterfaceAudience.Internal
+public enum Status {
+  SUCCESS, FAILURE
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
new file mode 100644
index 0000000..27dc38b
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class QueryRequest implements Serializable, Writable {
+  private int requestId;
+  private CarbonMultiBlockSplit split;
+  private TableInfo tableInfo;
+  private String[] projectColumns;
+  private Expression filterExpression;
+  private long limit;
+
+  public QueryRequest() {
+  }
+
+  public QueryRequest(int requestId, CarbonMultiBlockSplit split,
+      TableInfo tableInfo, String[] projectColumns, Expression filterExpression, long limit) {
+    this.requestId = requestId;
+    this.split = split;
+    this.tableInfo = tableInfo;
+    this.projectColumns = projectColumns;
+    this.filterExpression = filterExpression;
+    this.limit = limit;
+  }
+
+  public int getRequestId() {
+    return requestId;
+  }
+
+  public CarbonMultiBlockSplit getSplit() {
+    return split;
+  }
+
+  public TableInfo getTableInfo() {
+    return tableInfo;
+  }
+
+  public String[] getProjectColumns() {
+    return projectColumns;
+  }
+
+  public Expression getFilterExpression() {
+    return filterExpression;
+  }
+
+  public long getLimit() {
+    return limit;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(requestId);
+    split.write(out);
+    tableInfo.write(out);
+    out.writeInt(projectColumns.length);
+    for (String projectColumn : projectColumns) {
+      out.writeUTF(projectColumn);
+    }
+    String filter = ObjectSerializationUtil.convertObjectToString(filterExpression);
+    out.writeUTF(filter);
+    out.writeLong(limit);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    requestId = in.readInt();
+    split = new CarbonMultiBlockSplit();
+    split.readFields(in);
+    tableInfo = new TableInfo();
+    tableInfo.readFields(in);
+    projectColumns = new String[in.readInt()];
+    for (int i = 0; i < projectColumns.length; i++) {
+      projectColumns[i] = in.readUTF();
+    }
+    String filter = in.readUTF();
+    filterExpression = (Expression) ObjectSerializationUtil.convertStringToObject(filter);
+    limit = in.readLong();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
new file mode 100644
index 0000000..033f1a5
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+@InterfaceAudience.Internal
+public class QueryResponse implements Serializable, Writable {
+  private int queryId;
+  private int status;
+  private String message;
+  private Object[][] rows;
+
+  public QueryResponse() {
+  }
+
+  public QueryResponse(int queryId, int status, String message, Object[][] rows) {
+    this.queryId = queryId;
+    this.status = status;
+    this.message = message;
+    this.rows = rows;
+  }
+
+  public int getQueryId() {
+    return queryId;
+  }
+
+  public int getStatus() {
+    return status;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public Object[][] getRows() {
+    return rows;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(queryId);
+    out.writeInt(status);
+    out.writeUTF(message);
+    WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(rows));
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    queryId = in.readInt();
+    status = in.readInt();
+    message = in.readUTF();
+    try {
+      rows = (Object[][])ObjectSerializationUtil.deserialize(
+          WritableUtils.readCompressedByteArray(in));
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
new file mode 100644
index 0000000..894948b
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class RegisterWorkerRequest implements Serializable, Writable {
+  private String hostAddress;
+  private int port;
+  private int cores;
+
+  public RegisterWorkerRequest() {
+  }
+
+  public RegisterWorkerRequest(String hostAddress, int port, int cores) {
+    this.hostAddress = hostAddress;
+    this.port = port;
+    this.cores = cores;
+  }
+
+  public String getHostAddress() {
+    return hostAddress;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public int getCores() {
+    return cores;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(hostAddress);
+    out.writeInt(port);
+    out.writeInt(cores);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    hostAddress = in.readUTF();
+    port = in.readInt();
+    cores = in.readInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
new file mode 100644
index 0000000..8465c90
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class RegisterWorkerResponse implements Serializable, Writable {
+
+  private String workerId;
+
+  public RegisterWorkerResponse() {
+  }
+
+  public RegisterWorkerResponse(String workerId) {
+    this.workerId = workerId;
+  }
+
+  public String getWorkerId() {
+    return workerId;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(workerId);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    workerId = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
new file mode 100644
index 0000000..7a25944
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class ShutdownRequest implements Serializable, Writable {
+  private String reason;
+
+  public ShutdownRequest() {
+  }
+
+  public ShutdownRequest(String reason) {
+    this.reason = reason;
+  }
+
+  public String getReason() {
+    return reason;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(reason);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    reason = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
new file mode 100644
index 0000000..f6f329f
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class ShutdownResponse implements Serializable, Writable {
+  private int status;
+  private String message;
+
+  public ShutdownResponse() {
+  }
+
+  public ShutdownResponse(int status, String message) {
+    this.status = status;
+    this.message = message;
+  }
+
+  public int getStatus() {
+    return status;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(status);
+    out.writeUTF(message);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    status = in.readInt();
+    message = in.readUTF();
+  }
+}


[2/3] carbondata git commit: [CARBONDATA-2609] Change RPC implementation to Hadoop RPC framework

Posted by qi...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/scala/org/apache/carbondata/store/Master.scala
----------------------------------------------------------------------
diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Master.scala b/store/core/src/main/scala/org/apache/carbondata/store/Master.scala
new file mode 100644
index 0000000..2109251
--- /dev/null
+++ b/store/core/src/main/scala/org/apache/carbondata/store/Master.scala
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.io.IOException
+import java.net.{BindException, InetAddress}
+import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
+import java.util.concurrent.{ExecutionException, Future, TimeoutException, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.ipc.RPC
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.block.Distributable
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
+import org.apache.carbondata.hadoop.api.CarbonInputFormat
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.store.rpc.{RegistryService, ServiceFactory}
+import org.apache.carbondata.store.rpc.impl.{RegistryServiceImpl, Status}
+import org.apache.carbondata.store.rpc.model._
+
+/**
+ * Master of CarbonSearch.
+ * It provides a Registry service for worker to register.
+ * And it provides search API to fire RPC call to workers.
+ */
+@InterfaceAudience.Internal
+private[store] class Master {
+  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  // worker host address map to EndpointRef
+
+  private val random = new Random
+
+  private var registryServer: RPC.Server = _
+
+  private val scheduler: Scheduler = new Scheduler
+
+  def buildServer(serverHost: String, serverPort: Int): RPC.Server = {
+    val hadoopConf = FileFactory.getConfiguration
+    val builder = new RPC.Builder(hadoopConf)
+    builder
+      .setBindAddress(serverHost)
+      .setPort(serverPort)
+      .setProtocol(classOf[RegistryService])
+      .setInstance(new RegistryServiceImpl(this))
+      .build
+  }
+
+  /** start service and listen on port passed in constructor */
+  def startService(): Unit = {
+    if (registryServer == null) {
+      LOG.info("Start search mode master thread")
+      val isStarted: AtomicBoolean = new AtomicBoolean(false)
+      new Thread(new Runnable {
+        override def run(): Unit = {
+          val hostAddress = InetAddress.getLocalHost.getHostAddress
+          var port = CarbonProperties.getSearchMasterPort
+          var exception: BindException = null
+          var numTry = 100  // we will try to create service at worse case 100 times
+          do {
+            try {
+              LOG.info(s"building registry-service on $hostAddress:$port")
+              registryServer = buildServer(hostAddress, port)
+              numTry = 0
+            } catch {
+              case e: BindException =>
+                // port is occupied, increase the port number and try again
+                exception = e
+                LOG.error(s"start registry-service failed: ${e.getMessage}")
+                port = port + 1
+                numTry = numTry - 1
+            }
+          } while (numTry > 0)
+          if (registryServer == null) {
+            // we have tried many times, but still failed to find an available port
+            throw exception
+          }
+          if (isStarted.compareAndSet(false, false)) {
+            synchronized {
+              isStarted.compareAndSet(false, true)
+            }
+          }
+          LOG.info("starting registry-service")
+          registryServer.start()
+          LOG.info("registry-service started")
+        }
+      }).start()
+      var count = 0
+      val countThreshold = 5000
+      while (isStarted.compareAndSet(false, false) && count < countThreshold) {
+        LOG.info(s"Waiting search mode master to start, retrying $count times")
+        Thread.sleep(10)
+        count = count + 1
+      }
+      if (count >= countThreshold) {
+        LOG.error(s"Search mode try $countThreshold times to start master but failed")
+        throw new RuntimeException(
+          s"Search mode try $countThreshold times to start master but failed")
+      } else {
+        LOG.info("Search mode master started")
+      }
+    } else {
+      LOG.info("Search mode master has already started")
+    }
+  }
+
+  def stopService(): Unit = {
+    if (registryServer != null) {
+      registryServer.stop()
+      registryServer.join()
+      registryServer = null
+    }
+  }
+
+  def stopAllWorkers(): Unit = {
+    scheduler.getAllWorkers.toSeq.foreach { case (address, schedulable) =>
+      val response = try {
+        schedulable.service.shutdown(new ShutdownRequest("user"))
+      } catch {
+        case throwable: Throwable =>
+          throw new IOException(throwable)
+      }
+      scheduler.removeWorker(address)
+    }
+  }
+
+  /** A new searcher is trying to register, add it to the map and connect to this searcher */
+  def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = {
+    LOG.info(s"Receive Register request from worker ${request.getHostAddress}:${request.getPort} " +
+             s"with ${request.getCores} cores")
+    val workerId = UUID.randomUUID().toString
+    val workerAddress = request.getHostAddress
+    val workerPort = request.getPort
+    LOG.info(s"connecting to worker ${request.getHostAddress}:${request.getPort}, " +
+             s"workerId $workerId")
+
+    val searchService = ServiceFactory.createSearchService(workerAddress, workerPort)
+    scheduler.addWorker(workerAddress,
+      new Schedulable(workerId, workerAddress, workerPort, request.getCores, searchService))
+    LOG.info(s"worker ${request.getHostAddress}:${request.getPort} registered")
+    new RegisterWorkerResponse(workerId)
+  }
+
+  /**
+   * Execute search by firing RPC call to worker, return the result rows
+   * @param table table to search
+   * @param columns projection column names
+   * @param filter filter expression
+   * @param globalLimit max number of rows required in Master
+   * @param localLimit max number of rows required in Worker
+   * @return
+   */
+  def search(table: CarbonTable, columns: Array[String], filter: Expression,
+      globalLimit: Long, localLimit: Long): Array[CarbonRow] = {
+    Objects.requireNonNull(table)
+    Objects.requireNonNull(columns)
+    if (globalLimit < 0 || localLimit < 0) {
+      throw new IllegalArgumentException("limit should be positive")
+    }
+
+    val queryId = random.nextInt
+    var rowCount = 0
+    val output = new ArrayBuffer[CarbonRow]
+
+    def onSuccess(result: QueryResponse): Unit = {
+      // in case of RPC success, collect all rows in response message
+      if (result.getQueryId != queryId) {
+        throw new IOException(
+          s"queryId in response does not match request: ${result.getQueryId} != $queryId")
+      }
+      if (result.getStatus != Status.SUCCESS.ordinal()) {
+        throw new IOException(s"failure in worker: ${ result.getMessage }")
+      }
+
+      val itor = result.getRows.iterator
+      while (itor.hasNext && rowCount < globalLimit) {
+        output += new CarbonRow(itor.next())
+        rowCount = rowCount + 1
+      }
+      LOG.info(s"[QueryId:$queryId] accumulated result size $rowCount")
+    }
+    def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }")
+    def onTimedout() = throw new ExecutionTimeoutException()
+
+    // prune data and get a mapping of worker hostname to list of blocks,
+    // then add these blocks to the QueryRequest and fire the RPC call
+    val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
+    val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
+      // Build a QueryRequest
+      val split = new CarbonMultiBlockSplit(blocks, splitAddress)
+      val request =
+        new QueryRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
+
+      // Find an Endpoind and send the request to it
+      // This RPC is non-blocking so that we do not need to wait before send to next worker
+      scheduler.sendRequestAsync(splitAddress, request)
+    }
+
+    // loop to get the result of each Worker
+    tuple.foreach { case (worker: Schedulable, future: Future[QueryResponse]) =>
+
+      // if we have enough data already, we do not need to collect more result
+      if (rowCount < globalLimit) {
+        // wait for worker
+        val response = try {
+          future.get(CarbonProperties.getInstance().getQueryTimeout.toLong, TimeUnit.SECONDS)
+        } catch {
+          case e: ExecutionException => onFaiure(e)
+          case t: TimeoutException => onTimedout()
+        } finally {
+          worker.workload.decrementAndGet()
+        }
+        LOG.info(s"[QueryId:$queryId] receive search response from worker " +
+                 s"${worker.address}:${worker.port}")
+        onSuccess(response)
+      }
+    }
+    output.toArray
+  }
+
+  /**
+   * Prune data by using CarbonInputFormat.getSplit
+   * Return a mapping of host address to list of block
+   */
+  private def pruneBlock(
+      table: CarbonTable,
+      columns: Array[String],
+      filter: Expression): JMap[String, JList[Distributable]] = {
+    val jobConf = new JobConf(new Configuration)
+    val job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
+      job, table, columns, filter, null, null)
+
+    // We will do FG pruning in reader side, so don't do it here
+    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false)
+    val splits = format.getSplits(job)
+    val distributables = splits.asScala.map { split =>
+      split.asInstanceOf[Distributable]
+    }
+    CarbonLoaderUtil.nodeBlockMapping(
+      distributables.asJava,
+      -1,
+      getWorkers.asJava,
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST,
+      null)
+  }
+
+  /** return hostname of all workers */
+  def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq
+}
+
+// Exception if execution timed out in search mode
+class ExecutionTimeoutException extends RuntimeException

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala
----------------------------------------------------------------------
diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala b/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala
new file mode 100644
index 0000000..fb3ef86
--- /dev/null
+++ b/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.io.IOException
+import java.util.concurrent.{Callable, Executors, Future}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.store.rpc.QueryService
+import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse}
+
+/**
+ * [[Master]] uses Scheduler to pick a Worker to send request
+ */
+@InterfaceAudience.Internal
+private[store] class Scheduler {
+  // mapping of worker IP address to worker instance
+  private val workers = mutable.Map[String, Schedulable]()
+  private val random = new Random()
+  private val executors = Executors.newCachedThreadPool()
+  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Pick a Worker according to the address and workload of the Worker
+   * Invoke the RPC and return Future result
+   */
+  def sendRequestAsync(
+      splitAddress: String,
+      request: QueryRequest): (Schedulable, Future[QueryResponse]) = {
+    require(splitAddress != null)
+    if (workers.isEmpty) {
+      throw new IOException("No worker is available")
+    }
+    var worker: Schedulable = pickWorker(splitAddress)
+
+    // check whether worker exceed max workload, if exceeded, pick next worker
+    val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores)
+    var numTry = workers.size
+    do {
+      if (worker.workload.get() >= maxWorkload) {
+        LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...")
+        worker = pickNextWorker(worker)
+        numTry = numTry - 1
+      } else {
+        numTry = -1
+      }
+    } while (numTry > 0)
+    if (numTry == 0) {
+      // tried so many times and still not able to find Worker
+      throw new WorkerTooBusyException(
+        s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload")
+    }
+    LOG.info(s"sending search request to worker ${worker.address}:${worker.port}")
+    val future = executors.submit(
+      new Callable[QueryResponse] {
+        override def call(): QueryResponse = worker.service.query(request)
+      }
+    )
+    worker.workload.incrementAndGet()
+    (worker, future)
+  }
+
+  private def pickWorker[T: ClassTag](splitAddress: String) = {
+    try {
+      workers(splitAddress)
+    } catch {
+      case e: NoSuchElementException =>
+        // no local worker available, choose one worker randomly
+        pickRandomWorker()
+    }
+  }
+
+  /** pick a worker randomly */
+  private def pickRandomWorker() = {
+    val index = random.nextInt(workers.size)
+    workers.toSeq(index)._2
+  }
+
+  /** pick the next worker of the input worker in the [[Scheduler.workers]] */
+  private def pickNextWorker(worker: Schedulable) = {
+    val index = workers.zipWithIndex.find { case ((address, w), index) =>
+      w == worker
+    }.get._2
+    if (index == workers.size - 1) {
+      workers.toSeq.head._2
+    } else {
+      workers.toSeq(index + 1)._2
+    }
+  }
+
+  /** A new searcher is trying to register, add it to the map and connect to this searcher */
+  def addWorker(address: String, schedulable: Schedulable): Unit = {
+    require(schedulable != null)
+    require(address.equals(schedulable.address))
+    workers(address) = schedulable
+  }
+
+  def removeWorker(address: String): Unit = {
+    workers.remove(address)
+  }
+
+  def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator
+}
+
+/**
+ * Represent a Worker which [[Scheduler]] can send
+ * Search request on it
+ * @param id Worker ID, a UUID string
+ * @param cores, number of cores in Worker
+ * @param service RPC service reference
+ * @param workload number of outstanding request sent to Worker
+ */
+private[store] class Schedulable(
+    val id: String,
+    val address: String,
+    val port: Int,
+    val cores: Int,
+    val service: QueryService,
+    var workload: AtomicInteger) {
+  def this(id: String, address: String, port: Int, cores: Int, service: QueryService) = {
+    this(id, address, port, cores, service, new AtomicInteger())
+  }
+}
+
+class WorkerTooBusyException(message: String) extends RuntimeException(message)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala
----------------------------------------------------------------------
diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala b/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala
new file mode 100644
index 0000000..2ded00b
--- /dev/null
+++ b/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.io.IOException
+import java.net.{BindException, InetAddress}
+
+import org.apache.hadoop.ipc.RPC
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.store.rpc.{QueryService, RegistryService, ServiceFactory}
+import org.apache.carbondata.store.rpc.impl.QueryServiceImpl
+import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest
+
+@InterfaceAudience.Internal
+private[store] object Worker {
+  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private val hostAddress = InetAddress.getLocalHost.getHostAddress
+  private var port: Int = _
+  private var registry: RegistryService = _
+
+  def init(masterHostAddress: String, masterPort: Int): Unit = {
+    LOG.info(s"initializing worker...")
+    startService()
+    LOG.info(s"registering to master $masterHostAddress:$masterPort")
+    val workerId = registerToMaster(masterHostAddress, masterPort)
+    LOG.info(s"worker registered to master, workerId: $workerId")
+  }
+
+  def buildServer(serverHost: String, serverPort: Int): RPC.Server = {
+    val hadoopConf = FileFactory.getConfiguration
+    val builder = new RPC.Builder(hadoopConf)
+    builder
+      .setNumHandlers(Runtime.getRuntime.availableProcessors)
+      .setBindAddress(serverHost)
+      .setPort(serverPort)
+      .setProtocol(classOf[QueryService])
+      .setInstance(new QueryServiceImpl)
+      .build
+  }
+
+  /**
+   * Start to listen on port [[CarbonProperties.getSearchWorkerPort]]
+   */
+  private def startService(): Unit = {
+    new Thread(new Runnable {
+      override def run(): Unit = {
+        port = CarbonProperties.getSearchWorkerPort
+        var searchServer: RPC.Server = null
+        var exception: BindException = null
+        var numTry = 100  // we will try to create service at worse case 100 times
+        do {
+          try {
+            LOG.info(s"building search-service on $hostAddress:$port")
+            searchServer = buildServer(hostAddress, port)
+            numTry = 0
+          } catch {
+            case e: BindException =>
+              // port is occupied, increase the port number and try again
+              exception = e
+              LOG.error(s"start search-service failed: ${e.getMessage}")
+              port = port + 1
+              numTry = numTry - 1
+          }
+        } while (numTry > 0)
+        if (searchServer == null) {
+          // we have tried many times, but still failed to find an available port
+          throw exception
+        }
+        LOG.info("starting search-service")
+        searchServer.start()
+        LOG.info("search-service started")
+      }
+    }).start()
+  }
+
+  private def registerToMaster(registryHostAddress: String, registryPort: Int): String = {
+    LOG.info(s"trying to register to master $registryHostAddress:$registryPort")
+    if (registry == null) {
+      registry = ServiceFactory.createRegistryService(registryHostAddress, registryPort)
+    }
+    val cores = Runtime.getRuntime.availableProcessors()
+    val request = new RegisterWorkerRequest(hostAddress, port, cores)
+    val response = try {
+      registry.registerWorker(request)
+    } catch {
+      case throwable: Throwable =>
+        LOG.error(s"worker failed to registered: $throwable")
+        throw new IOException(throwable)
+    }
+
+    LOG.info("worker registered")
+    response.getWorkerId
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
new file mode 100644
index 0000000..c885a26
--- /dev/null
+++ b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.file.Schema;
+import org.apache.carbondata.sdk.file.TestUtil;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LocalCarbonStoreTest {
+  @Before
+  public void cleanFile() {
+    assert (TestUtil.cleanMdtFile());
+  }
+
+  @After
+  public void verifyDMFile() {
+    assert (!TestUtil.verifyMdtFile());
+  }
+
+  // TODO: complete this testcase
+  // Currently result rows are empty, because SDK is not writing table status file
+  // so that reader does not find any segment.
+  // Complete this testcase after flat folder reader is done.
+  @Test
+  public void testWriteAndReadFiles() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
+
+    CarbonStore store = new LocalCarbonStore();
+    Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, null);
+
+    while (rows.hasNext()) {
+      CarbonRow row = rows.next();
+      System.out.println(row.toString());
+    }
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
new file mode 100644
index 0000000..9b9aa9e
--- /dev/null
+++ b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.junit.Assert;
+
+public class TestUtil {
+
+  static void writeFilesAndVerify(Schema schema, String path) {
+    writeFilesAndVerify(schema, path, null);
+  }
+
+  static void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
+    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1, true);
+  }
+
+  public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema) {
+    writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, true);
+  }
+
+  public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema,
+      boolean isTransactionalTable) {
+    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, isTransactionalTable);
+  }
+
+  /**
+   * write file and verify
+   *
+   * @param rows                 number of rows
+   * @param schema               schema
+   * @param path                 table store path
+   * @param persistSchema        whether persist schema
+   * @param isTransactionalTable whether is transactional table
+   */
+  public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema,
+      boolean isTransactionalTable) {
+    writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, isTransactionalTable);
+  }
+
+  /**
+   * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
+   * @param rows number of rows to write
+   * @param schema schema of the file
+   * @param path local write path
+   * @param sortColumns sort columns
+   * @param persistSchema true if want to persist schema file
+   * @param blockletSize blockletSize in the file, -1 for default size
+   * @param blockSize blockSize in the file, -1 for default size
+   * @param isTransactionalTable set to true if this is written for Transactional Table.
+   */
+  static void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
+      boolean persistSchema, int blockletSize, int blockSize, boolean isTransactionalTable) {
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .isTransactionalTable(isTransactionalTable)
+          .outputPath(path);
+      if (sortColumns != null) {
+        builder = builder.sortBy(sortColumns);
+      }
+      if (persistSchema) {
+        builder = builder.persistSchemaFile(true);
+      }
+      if (blockletSize != -1) {
+        builder = builder.withBlockletSize(blockletSize);
+      }
+      if (blockSize != -1) {
+        builder = builder.withBlockSize(blockSize);
+      }
+
+      CarbonWriter writer = builder.buildWriterForCSVInput(schema);
+
+      for (int i = 0; i < rows; i++) {
+        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
+      }
+      writer.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } catch (InvalidLoadOptionException l) {
+      l.printStackTrace();
+      Assert.fail(l.getMessage());
+    }
+
+    File segmentFolder = null;
+    if (isTransactionalTable) {
+      segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+      Assert.assertTrue(segmentFolder.exists());
+    } else {
+      segmentFolder = new File(path);
+      Assert.assertTrue(segmentFolder.exists());
+    }
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+  }
+
+  /**
+   * verify whether the file exists
+   * if delete the file success or file not exists, then return true; otherwise return false
+   *
+   * @return boolean
+   */
+  public static boolean cleanMdtFile() {
+    String fileName = CarbonProperties.getInstance().getSystemFolderLocation()
+        + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile";
+    try {
+      if (FileFactory.isFileExist(fileName)) {
+        File file = new File(fileName);
+        file.delete();
+        return true;
+      } else {
+        return true;
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      return false;
+    }
+  }
+
+  /**
+   * verify whether the mdt file exists
+   * if the file exists, then return true; otherwise return false
+   *
+   * @return boolean
+   */
+  public static boolean verifyMdtFile() {
+    String fileName = CarbonProperties.getInstance().getSystemFolderLocation()
+        + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile";
+    try {
+      if (FileFactory.isFileExist(fileName)) {
+        return true;
+      }
+      return false;
+    } catch (IOException e) {
+      throw new RuntimeException("IO exception:", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala b/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala
new file mode 100644
index 0000000..95e7335
--- /dev/null
+++ b/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import org.apache.hadoop.ipc.ProtocolSignature
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.store.rpc.QueryService
+import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse, ShutdownRequest, ShutdownResponse}
+
+class SchedulerSuite extends FunSuite with BeforeAndAfterEach {
+
+  var scheduler: Scheduler = _
+  var w1: Schedulable = _
+  var w2: Schedulable = _
+  var w3: Schedulable = _
+
+  override def beforeEach(): Unit = {
+    scheduler = new Scheduler()
+    w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef())
+    w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef())
+    w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef())
+
+    scheduler.addWorker("1.1.1.1", w1)
+    scheduler.addWorker("1.1.1.2", w2)
+    scheduler.addWorker("1.1.1.3", w3)
+  }
+
+  test("test addWorker, removeWorker, getAllWorkers") {
+    assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
+
+    scheduler.removeWorker("1.1.1.2")
+    assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
+
+    val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef())
+    scheduler.addWorker("1.1.1.4", w4)
+    assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet)
+    assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id)
+  }
+
+  test("test normal schedule") {
+    val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+    assertResult(w1.id)(r1.id)
+    val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+    assertResult(w2.id)(r2.id)
+    val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    assertResult(w3.id)(r3.id)
+    val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+    assertResult(w1.id)(r4.id)
+    val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+    assertResult(w2.id)(r5.id)
+    val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    assertResult(w3.id)(r6.id)
+  }
+
+  test("test worker unavailable") {
+    val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null)
+    assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id))
+  }
+
+  test("test reschedule when target worker is overload") {
+    // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
+    (1 to 40).foreach { i =>
+      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+    val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    // it must be worker1 since worker3 exceed max workload
+    assertResult(w1.id)(r.id)
+  }
+
+  test("test all workers are overload") {
+    // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
+    (1 to 40).foreach { i =>
+      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+
+    val e = intercept[WorkerTooBusyException] {
+      scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+  }
+
+  test("test user configured overload param") {
+    val original = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3")
+
+    (1 to 3).foreach { i =>
+      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+
+    val e = intercept[WorkerTooBusyException] {
+      scheduler.sendRequestAsync("1.1.1.3", null)
+    }
+
+    if (original != null) {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original)
+    }
+  }
+
+  test("test invalid property") {
+    intercept[IllegalArgumentException] {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3")
+    }
+    var value = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+    assertResult(null)(value)
+
+    intercept[NumberFormatException] {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s")
+    }
+    value = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+    assertResult(null)(value)
+  }
+}
+
+class DummyRef extends QueryService {
+  override def query(request: QueryRequest): QueryResponse = ???
+
+  override def shutdown(request: ShutdownRequest): ShutdownResponse = ???
+
+  override def getProtocolVersion(protocol: String,
+      clientVersion: Long): Long = ???
+
+  override def getProtocolSignature(protocol: String,
+      clientVersion: Long,
+      clientMethodsHash: Int): ProtocolSignature = ???
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
deleted file mode 100644
index bafbb9f..0000000
--- a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
-
-/**
- * ReadSupport that convert row object to CarbonRow
- */
-@InterfaceAudience.Internal
-public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> {
-  private CarbonReadSupport<Object[]> delegate;
-
-  public CarbonRowReadSupport() {
-    this.delegate = new DictionaryDecodeReadSupport<>();
-  }
-
-  @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable)
-      throws IOException {
-    delegate.initialize(carbonColumns, carbonTable);
-  }
-
-  @Override public CarbonRow readRow(Object[] data) {
-    Object[] converted = delegate.readRow(data);
-    return new CarbonRow(converted);
-  }
-
-  @Override public void close() {
-    delegate.close();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java
deleted file mode 100644
index c6b2fb8..0000000
--- a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.scan.expression.Expression;
-
-/**
- * User can use {@link CarbonStore} to query data
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-public interface CarbonStore extends Closeable {
-
-  /**
-   * Scan query on the data in the table path
-   * @param path table path
-   * @param projectColumns column names to read
-   * @return rows
-   * @throws IOException if unable to read files in table path
-   */
-  Iterator<CarbonRow> scan(
-      String path,
-      String[] projectColumns) throws IOException;
-
-  /**
-   * Scan query with filter, on the data in the table path
-   * @param path table path
-   * @param projectColumns column names to read
-   * @param filter filter condition, can be null
-   * @return rows that satisfy filter condition
-   * @throws IOException if unable to read files in table path
-   */
-  Iterator<CarbonRow> scan(
-      String path,
-      String[] projectColumns,
-      Expression filter) throws IOException;
-
-  /**
-   * SQL query, table should be created before calling this function
-   * @param sqlString SQL statement
-   * @return rows
-   * @throws IOException if unable to read files in table path
-   */
-  Iterator<CarbonRow> sql(String sqlString) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
deleted file mode 100644
index daa1447..0000000
--- a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.api.CarbonInputFormat;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * A CarbonStore implementation that works locally, without other compute framework dependency.
- * It can be used to read data in local disk.
- *
- * Note that this class is experimental, it is not intended to be used in production.
- */
-@InterfaceAudience.Internal
-class LocalCarbonStore extends MetaCachedCarbonStore {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(LocalCarbonStore.class.getName());
-
-  @Override
-  public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws IOException {
-    return scan(path, projectColumns, null);
-  }
-
-  @Override public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter)
-      throws IOException {
-    Objects.requireNonNull(path);
-    Objects.requireNonNull(projectColumns);
-
-    CarbonTable table = getTable(path);
-    if (table.isStreamingSink() || table.isHivePartitionTable()) {
-      throw new UnsupportedOperationException("streaming and partition table is not supported");
-    }
-    // TODO: use InputFormat to prune data and read data
-
-    final CarbonTableInputFormat format = new CarbonTableInputFormat();
-    final Job job = new Job(new Configuration());
-    CarbonInputFormat.setTableInfo(job.getConfiguration(), table.getTableInfo());
-    CarbonInputFormat.setTablePath(job.getConfiguration(), table.getTablePath());
-    CarbonInputFormat.setTableName(job.getConfiguration(), table.getTableName());
-    CarbonInputFormat.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
-    CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), CarbonRowReadSupport.class);
-    CarbonInputFormat
-        .setColumnProjection(job.getConfiguration(), new CarbonProjection(projectColumns));
-    if (filter != null) {
-      CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
-    }
-
-    final List<InputSplit> splits =
-        format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
-
-    List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size());
-
-    List<CarbonRow> rows = new ArrayList<>();
-
-    try {
-      for (InputSplit split : splits) {
-        TaskAttemptContextImpl attempt =
-            new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-        RecordReader reader = format.createRecordReader(split, attempt);
-        reader.initialize(split, attempt);
-        readers.add(reader);
-      }
-
-      for (RecordReader<Void, Object> reader : readers) {
-        while (reader.nextKeyValue()) {
-          rows.add((CarbonRow) reader.getCurrentValue());
-        }
-        try {
-          reader.close();
-        } catch (IOException e) {
-          LOGGER.error(e);
-        }
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } finally {
-      for (RecordReader<Void, Object> reader : readers) {
-        try {
-          reader.close();
-        } catch (IOException e) {
-          LOGGER.error(e);
-        }
-      }
-    }
-    return rows.iterator();
-  }
-
-  @Override
-  public Iterator<CarbonRow> sql(String sqlString) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
deleted file mode 100644
index e43f750..0000000
--- a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-/**
- * A CarbonStore base class that caches CarbonTable object
- */
-@InterfaceAudience.Internal
-abstract class MetaCachedCarbonStore implements CarbonStore {
-
-  // mapping of table path to CarbonTable object
-  private Map<String, CarbonTable> cache = new HashMap<>();
-
-  CarbonTable getTable(String path) throws IOException {
-    if (cache.containsKey(path)) {
-      return cache.get(path);
-    }
-    org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
-        .readSchemaFile(CarbonTablePath.getSchemaFilePath(path));
-    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
-    tableInfo1.setTablePath(path);
-    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1);
-    cache.put(path, table);
-    return table;
-  }
-
-  @Override
-  public void close() throws IOException {
-    cache.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
deleted file mode 100644
index c885a26..0000000
--- a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.sdk.file.Field;
-import org.apache.carbondata.sdk.file.Schema;
-import org.apache.carbondata.sdk.file.TestUtil;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class LocalCarbonStoreTest {
-  @Before
-  public void cleanFile() {
-    assert (TestUtil.cleanMdtFile());
-  }
-
-  @After
-  public void verifyDMFile() {
-    assert (!TestUtil.verifyMdtFile());
-  }
-
-  // TODO: complete this testcase
-  // Currently result rows are empty, because SDK is not writing table status file
-  // so that reader does not find any segment.
-  // Complete this testcase after flat folder reader is done.
-  @Test
-  public void testWriteAndReadFiles() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
-
-    CarbonStore store = new LocalCarbonStore();
-    Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, null);
-
-    while (rows.hasNext()) {
-      CarbonRow row = rows.next();
-      System.out.println(row.toString());
-    }
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/pom.xml
----------------------------------------------------------------------
diff --git a/store/search/pom.xml b/store/search/pom.xml
deleted file mode 100644
index 6acbbfb..0000000
--- a/store/search/pom.xml
+++ /dev/null
@@ -1,112 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.carbondata</groupId>
-    <artifactId>carbondata-parent</artifactId>
-    <version>1.5.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>carbondata-search</artifactId>
-  <name>Apache CarbonData :: Search </name>
-
-  <properties>
-    <dev.path>${basedir}/../../dev</dev.path>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-hadoop</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${spark.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <testSourceDirectory>src/test/scala</testSourceDirectory>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.scala-tools</groupId>
-        <artifactId>maven-scala-plugin</artifactId>
-        <version>2.15.2</version>
-        <executions>
-          <execution>
-            <id>compile</id>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-            <phase>compile</phase>
-          </execution>
-          <execution>
-            <id>testCompile</id>
-            <goals>
-              <goal>testCompile</goal>
-            </goals>
-            <phase>test</phase>
-          </execution>
-          <execution>
-            <phase>process-resources</phase>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-        <version>1.0</version>
-        <!-- Note config is repeated in surefire config -->
-        <configuration>
-          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-          <junitxml>.</junitxml>
-          <filereports>CarbonTestSuite.txt</filereports>
-          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
-          </argLine>
-          <stderr />
-          <environmentVariables>
-          </environmentVariables>
-          <systemProperties>
-            <java.awt.headless>true</java.awt.headless>
-            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
-          </systemProperties>
-        </configuration>
-        <executions>
-          <execution>
-            <id>test</id>
-            <goals>
-              <goal>test</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
deleted file mode 100644
index 0a3110e..0000000
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.worker;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapDistributable;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.model.QueryModelBuilder;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonTaskInfo;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
-
-import org.apache.spark.search.SearchRequest;
-import org.apache.spark.search.SearchResult;
-import org.apache.spark.search.ShutdownRequest;
-import org.apache.spark.search.ShutdownResponse;
-
-/**
- * Thread runnable for handling SearchRequest from master.
- */
-@InterfaceAudience.Internal
-public class SearchRequestHandler {
-
-  private static final LogService LOG =
-      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
-
-  public SearchResult handleSearch(SearchRequest request) {
-    try {
-      LOG.info(String.format("[SearchId:%d] receive search request", request.searchId()));
-      List<CarbonRow> rows = handleRequest(request);
-      LOG.info(String.format("[SearchId:%d] sending success response", request.searchId()));
-      return createSuccessResponse(request, rows);
-    } catch (IOException | InterruptedException e) {
-      LOG.error(e);
-      LOG.info(String.format("[SearchId:%d] sending failure response", request.searchId()));
-      return createFailureResponse(request, e);
-    }
-  }
-
-  public ShutdownResponse handleShutdown(ShutdownRequest request) {
-    LOG.info("Shutting down worker...");
-    SearchModeDetailQueryExecutor.shutdownThreadPool();
-    SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
-    LOG.info("Worker shutted down");
-    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
-  }
-
-  private DataMapExprWrapper chooseFGDataMap(
-          CarbonTable table,
-          FilterResolverIntf filterInterface) {
-    DataMapChooser chooser = null;
-    try {
-      chooser = new DataMapChooser(table);
-      return chooser.chooseFGDataMap(filterInterface);
-    } catch (IOException e) {
-      LOG.audit(e.getMessage());
-      return null;
-    }
-  }
-
-  /**
-   * Builds {@link QueryModel} and read data from files
-   */
-  private List<CarbonRow> handleRequest(SearchRequest request)
-      throws IOException, InterruptedException {
-    CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
-    carbonTaskInfo.setTaskId(System.nanoTime());
-    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
-    TableInfo tableInfo = request.tableInfo();
-    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
-    QueryModel queryModel = createQueryModel(table, request);
-
-    // in search mode, plain reader is better since it requires less memory
-    queryModel.setVectorReader(false);
-
-    CarbonMultiBlockSplit mbSplit = request.split().value();
-    List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
-    queryModel.setTableBlockInfos(list);
-    long limit = request.limit();
-    long rowCount = 0;
-
-    LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
-        request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size()));
-    DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
-            queryModel.getFilterExpressionResolverTree());
-
-    // If there is DataMap selected in Master, prune the split by it
-    if (fgDataMap != null) {
-      queryModel = prune(request.searchId(), table, queryModel, mbSplit, fgDataMap);
-    }
-
-    // In search mode, reader will read multiple blocks by using a thread pool
-    CarbonRecordReader<CarbonRow> reader =
-        new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
-
-    // read all rows by the reader
-    List<CarbonRow> rows = new LinkedList<>();
-    try {
-      reader.initialize(mbSplit, null);
-
-      // loop to read required number of rows.
-      // By default, if user does not specify the limit value, limit is Long.MaxValue
-      while (reader.nextKeyValue() && rowCount < limit) {
-        rows.add(reader.getCurrentValue());
-        rowCount++;
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } finally {
-      reader.close();
-    }
-    LOG.info(String.format("[SearchId:%d] scan completed, return %d rows",
-        request.searchId(), rows.size()));
-    return rows;
-  }
-
-  /**
-   * If there is FGDataMap defined for this table and filter condition in the query,
-   * prune the splits by the DataMap and set the pruned split into the QueryModel and return
-   */
-  private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel,
-      CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException {
-    Objects.requireNonNull(datamap);
-    List<Segment> segments = new LinkedList<>();
-    HashMap<String, Integer> uniqueSegments = new HashMap<>();
-    LoadMetadataDetails[] loadMetadataDetails =
-        SegmentStatusManager.readLoadMetadata(
-            CarbonTablePath.getMetadataPath(table.getTablePath()));
-    for (CarbonInputSplit split : mbSplit.getAllSplits()) {
-      String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString();
-      if (uniqueSegments.get(segmentId) == null) {
-        segments.add(Segment.toSegment(segmentId,
-            new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
-                loadMetadataDetails)));
-        uniqueSegments.put(segmentId, 1);
-      } else {
-        uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
-      }
-    }
-
-    List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments);
-    List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>();
-    for (int i = 0; i < distributables.size(); i++) {
-      DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable();
-      prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
-    }
-
-    HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
-    for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
-      pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet);
-    }
-
-    List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
-    List<TableBlockInfo> blockToRead = new LinkedList<>();
-    for (TableBlockInfo block : blocks) {
-      if (pathToRead.keySet().contains(block.getFilePath())) {
-        // If not set this, it will can't create FineGrainBlocklet object in
-        // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
-        block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
-        blockToRead.add(block);
-      }
-    }
-    LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", queryId,
-        blockToRead.size()));
-    queryModel.setTableBlockInfos(blockToRead);
-    queryModel.setFG(true);
-    return queryModel;
-  }
-
-  private QueryModel createQueryModel(CarbonTable table, SearchRequest request) {
-    String[] projectColumns = request.projectColumns();
-    Expression filter = null;
-    if (request.filterExpression() != null) {
-      filter = request.filterExpression();
-    }
-    return new QueryModelBuilder(table)
-        .projectColumns(projectColumns)
-        .filterExpression(filter)
-        .build();
-  }
-
-  /**
-   * create a failure response
-   */
-  private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) {
-    return new SearchResult(request.searchId(), Status.FAILURE.ordinal(), throwable.getMessage(),
-        new Object[0][]);
-  }
-
-  /**
-   * create a success response with result rows
-   */
-  private SearchResult createSuccessResponse(SearchRequest request, List<CarbonRow> rows) {
-    Iterator<CarbonRow> itor = rows.iterator();
-    Object[][] output = new Object[rows.size()][];
-    int i = 0;
-    while (itor.hasNext()) {
-      output[i++] = itor.next().getData();
-    }
-    return new SearchResult(request.searchId(), Status.SUCCESS.ordinal(), "", output);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java b/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
deleted file mode 100644
index 71df3e0..0000000
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.worker;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-/**
- * Status of RPC response
- */
-@InterfaceAudience.Internal
-public enum Status {
-  SUCCESS, FAILURE
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
deleted file mode 100644
index b7630fb..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success, Try}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{SecurityManager, SerializableWritable, SparkConf}
-import org.apache.spark.rpc.netty.NettyRpcEnvFactory
-import org.apache.spark.search._
-import org.apache.spark.util.ThreadUtils
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.block.Distributable
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
-import org.apache.carbondata.hadoop.api.CarbonInputFormat
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.store.worker.Status
-
-/**
- * Master of CarbonSearch.
- * It provides a Registry service for worker to register.
- * And it provides search API to fire RPC call to workers.
- */
-@InterfaceAudience.Internal
-class Master(sparkConf: SparkConf) {
-  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  // worker host address map to EndpointRef
-
-  private val random = new Random
-
-  private var rpcEnv: RpcEnv = _
-
-  private val scheduler: Scheduler = new Scheduler
-
-  /** start service and listen on port passed in constructor */
-  def startService(): Unit = {
-    if (rpcEnv == null) {
-      LOG.info("Start search mode master thread")
-      val isStarted: AtomicBoolean = new AtomicBoolean(false)
-      new Thread(new Runnable {
-        override def run(): Unit = {
-          val hostAddress = InetAddress.getLocalHost.getHostAddress
-          var port = CarbonProperties.getSearchMasterPort
-          var exception: BindException = null
-          var numTry = 100  // we will try to create service at worse case 100 times
-          do {
-            try {
-              LOG.info(s"starting registry-service on $hostAddress:$port")
-              val config = RpcEnvConfig(
-                sparkConf, "registry-service", hostAddress, "", port,
-                new SecurityManager(sparkConf), clientMode = false)
-              rpcEnv = new NettyRpcEnvFactory().create(config)
-              numTry = 0
-            } catch {
-              case e: BindException =>
-                // port is occupied, increase the port number and try again
-                exception = e
-                LOG.error(s"start registry-service failed: ${e.getMessage}")
-                port = port + 1
-                numTry = numTry - 1
-            }
-          } while (numTry > 0)
-          if (rpcEnv == null) {
-            // we have tried many times, but still failed to find an available port
-            throw exception
-          }
-          val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this)
-          rpcEnv.setupEndpoint("registry-service", registryEndpoint)
-          if (isStarted.compareAndSet(false, false)) {
-            synchronized {
-              isStarted.compareAndSet(false, true)
-            }
-          }
-          LOG.info("registry-service started")
-          rpcEnv.awaitTermination()
-        }
-      }).start()
-      var count = 0
-      val countThreshold = 5000
-      while (isStarted.compareAndSet(false, false) && count < countThreshold) {
-        LOG.info(s"Waiting search mode master to start, retrying $count times")
-        Thread.sleep(10)
-        count = count + 1;
-      }
-      if (count >= countThreshold) {
-        LOG.error(s"Search mode try $countThreshold times to start master but failed")
-        throw new RuntimeException(
-          s"Search mode try $countThreshold times to start master but failed")
-      } else {
-        LOG.info("Search mode master started")
-      }
-    } else {
-      LOG.info("Search mode master has already started")
-    }
-  }
-
-  def stopService(): Unit = {
-    if (rpcEnv != null) {
-      rpcEnv.shutdown()
-      rpcEnv = null
-    }
-  }
-
-  def stopAllWorkers(): Unit = {
-    val futures = scheduler.getAllWorkers.toSeq.map { case (address, schedulable) =>
-      (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user")))
-    }
-    futures.foreach { case (address, future) =>
-      ThreadUtils.awaitResult(future, Duration.apply("10s"))
-      future.value match {
-        case Some(result) =>
-          result match {
-            case Success(response) => scheduler.removeWorker(address)
-            case Failure(throwable) => throw new IOException(throwable.getMessage)
-          }
-        case None => throw new ExecutionTimeoutException
-      }
-    }
-  }
-
-  /** A new searcher is trying to register, add it to the map and connect to this searcher */
-  def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = {
-    LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " +
-             s"with ${request.cores} cores")
-    val workerId = UUID.randomUUID().toString
-    val workerAddress = request.hostAddress
-    val workerPort = request.port
-    LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId")
-
-    val endPointRef =
-      rpcEnv.setupEndpointRef(RpcAddress(workerAddress, workerPort), "search-service")
-    scheduler.addWorker(workerAddress,
-      new Schedulable(workerId, workerAddress, workerPort, request.cores, endPointRef))
-    LOG.info(s"worker ${request.hostAddress}:${request.port} registered")
-    RegisterWorkerResponse(workerId)
-  }
-
-  /**
-   * Execute search by firing RPC call to worker, return the result rows
-   * @param table table to search
-   * @param columns projection column names
-   * @param filter filter expression
-   * @param globalLimit max number of rows required in Master
-   * @param localLimit max number of rows required in Worker
-   * @return
-   */
-  def search(table: CarbonTable, columns: Array[String], filter: Expression,
-      globalLimit: Long, localLimit: Long): Array[CarbonRow] = {
-    Objects.requireNonNull(table)
-    Objects.requireNonNull(columns)
-    if (globalLimit < 0 || localLimit < 0) {
-      throw new IllegalArgumentException("limit should be positive")
-    }
-
-    val queryId = random.nextInt
-    var rowCount = 0
-    val output = new ArrayBuffer[CarbonRow]
-
-    def onSuccess(result: SearchResult): Unit = {
-      // in case of RPC success, collect all rows in response message
-      if (result.queryId != queryId) {
-        throw new IOException(
-          s"queryId in response does not match request: ${result.queryId} != $queryId")
-      }
-      if (result.status != Status.SUCCESS.ordinal()) {
-        throw new IOException(s"failure in worker: ${ result.message }")
-      }
-
-      val itor = result.rows.iterator
-      while (itor.hasNext && rowCount < globalLimit) {
-        output += new CarbonRow(itor.next())
-        rowCount = rowCount + 1
-      }
-      LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount")
-    }
-    def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }")
-    def onTimedout() = throw new ExecutionTimeoutException()
-
-    // prune data and get a mapping of worker hostname to list of blocks,
-    // then add these blocks to the SearchRequest and fire the RPC call
-    val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
-    val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
-      // Build a SearchRequest
-      val split = new SerializableWritable[CarbonMultiBlockSplit](
-        new CarbonMultiBlockSplit(blocks, splitAddress))
-      val request =
-        SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
-
-      // Find an Endpoind and send the request to it
-      // This RPC is non-blocking so that we do not need to wait before send to next worker
-      scheduler.sendRequestAsync[SearchResult](splitAddress, request)
-    }
-
-    // loop to get the result of each Worker
-    tuple.foreach { case (worker: Schedulable, future: Future[SearchResult]) =>
-
-      // if we have enough data already, we do not need to collect more result
-      if (rowCount < globalLimit) {
-        // wait for worker
-        val timeout = CarbonProperties
-          .getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT,
-            CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT)
-        ThreadUtils.awaitResult(future, Duration.apply(timeout))
-        LOG.info(s"[SearchId:$queryId] receive search response from worker " +
-          s"${worker.address}:${worker.port}")
-        try {
-          future.value match {
-            case Some(response: Try[SearchResult]) =>
-              response match {
-                case Success(result) => onSuccess(result)
-                case Failure(e) => onFaiure(e)
-              }
-            case None => onTimedout()
-          }
-        } finally {
-          worker.workload.decrementAndGet()
-        }
-      }
-    }
-    output.toArray
-  }
-
-  /**
-   * Prune data by using CarbonInputFormat.getSplit
-   * Return a mapping of host address to list of block
-   */
-  private def pruneBlock(
-      table: CarbonTable,
-      columns: Array[String],
-      filter: Expression): JMap[String, JList[Distributable]] = {
-    val jobConf = new JobConf(new Configuration)
-    val job = new Job(jobConf)
-    val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
-      job, table, columns, filter, null, null)
-
-    // We will do FG pruning in reader side, so don't do it here
-    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false)
-    val splits = format.getSplits(job)
-    val distributables = splits.asScala.map { split =>
-      split.asInstanceOf[Distributable]
-    }
-    CarbonLoaderUtil.nodeBlockMapping(
-      distributables.asJava,
-      -1,
-      getWorkers.asJava,
-      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST,
-      null)
-  }
-
-  /** return hostname of all workers */
-  def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq
-}
-
-// Exception if execution timed out in search mode
-class ExecutionTimeoutException extends RuntimeException