You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/06/25 01:48:42 UTC
[1/3] carbondata git commit: [CARBONDATA-2609] Change RPC
implementation to Hadoop RPC framework
Repository: carbondata
Updated Branches:
refs/heads/carbonstore d5e86db52 -> fa111380f
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
deleted file mode 100644
index 26208d0..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.reflect.ClassTag
-import scala.util.Random
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * [[org.apache.spark.rpc.Master]] uses Scheduler to pick a Worker to send request
- */
-private[rpc] class Scheduler {
- // mapping of worker IP address to worker instance
- private val workers = mutable.Map[String, Schedulable]()
- private val random = new Random()
-
- private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- /**
- * Pick a Worker according to the address and workload of the Worker
- * Invoke the RPC and return Future result
- */
- def sendRequestAsync[T: ClassTag](
- splitAddress: String,
- request: Any): (Schedulable, Future[T]) = {
- require(splitAddress != null)
- if (workers.isEmpty) {
- throw new IOException("No worker is available")
- }
- var worker = pickWorker(splitAddress)
-
- // check whether worker exceed max workload, if exceeded, pick next worker
- val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores)
- var numTry = workers.size
- do {
- if (worker.workload.get() >= maxWorkload) {
- LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...")
- worker = pickNextWorker(worker)
- numTry = numTry - 1
- } else {
- numTry = -1
- }
- } while (numTry > 0)
- if (numTry == 0) {
- // tried so many times and still not able to find Worker
- throw new WorkerTooBusyException(
- s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload")
- }
- LOG.info(s"sending search request to worker ${worker.address}:${worker.port}")
- val future = worker.ref.ask(request)
- worker.workload.incrementAndGet()
- (worker, future)
- }
-
- private def pickWorker[T: ClassTag](splitAddress: String) = {
- try {
- workers(splitAddress)
- } catch {
- case e: NoSuchElementException =>
- // no local worker available, choose one worker randomly
- pickRandomWorker()
- }
- }
-
- /** pick a worker randomly */
- private def pickRandomWorker() = {
- val index = random.nextInt(workers.size)
- workers.toSeq(index)._2
- }
-
- /** pick the next worker of the input worker in the [[Scheduler.workers]] */
- private def pickNextWorker(worker: Schedulable) = {
- val index = workers.zipWithIndex.find { case ((address, w), index) =>
- w == worker
- }.get._2
- if (index == workers.size - 1) {
- workers.toSeq.head._2
- } else {
- workers.toSeq(index + 1)._2
- }
- }
-
- /** A new searcher is trying to register, add it to the map and connect to this searcher */
- def addWorker(address: String, schedulable: Schedulable): Unit = {
- require(schedulable != null)
- require(address.equals(schedulable.address))
- workers(address) = schedulable
- }
-
- def removeWorker(address: String): Unit = {
- workers.remove(address)
- }
-
- def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator
-}
-
-/**
- * Represent a Worker which [[Scheduler]] can send
- * Search request on it
- * @param id Worker ID, a UUID string
- * @param cores, number of cores in Worker
- * @param ref RPC endpoint reference
- * @param workload number of outstanding request sent to Worker
- */
-private[rpc] class Schedulable(
- val id: String,
- val address: String,
- val port: Int,
- val cores: Int,
- val ref: RpcEndpointRef,
- var workload: AtomicInteger) {
- def this(id: String, address: String, port: Int, cores: Int, ref: RpcEndpointRef) = {
- this(id, address, port, cores, ref, new AtomicInteger())
- }
-}
-
-class WorkerTooBusyException(message: String) extends RuntimeException(message)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
deleted file mode 100644
index 0f2138a..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.rpc.netty.NettyRpcEnvFactory
-import org.apache.spark.search.{RegisterWorkerRequest, RegisterWorkerResponse, Searcher}
-import org.apache.spark.util.ThreadUtils
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-@InterfaceAudience.Internal
-object Worker {
- private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- private val hostAddress = InetAddress.getLocalHost.getHostAddress
- private var port: Int = _
-
- def init(masterHostAddress: String, masterPort: Int): Unit = {
- LOG.info(s"initializing worker...")
- startService()
- LOG.info(s"registering to master $masterHostAddress:$masterPort")
- val workerId = registerToMaster(masterHostAddress, masterPort)
- LOG.info(s"worker registered to master, workerId: $workerId")
- }
-
- /**
- * Start to listen on port [[CarbonProperties.getSearchWorkerPort]]
- */
- private def startService(): Unit = {
- new Thread(new Runnable {
- override def run(): Unit = {
- port = CarbonProperties.getSearchWorkerPort
- val conf = new SparkConf()
- var rpcEnv: RpcEnv = null
- var exception: BindException = null
- var numTry = 100 // we will try to create service at worse case 100 times
- do {
- try {
- LOG.info(s"starting search-service on $hostAddress:$port")
- val config = RpcEnvConfig(
- conf, s"worker-$hostAddress", hostAddress, "", port,
- new SecurityManager(conf), clientMode = false)
- rpcEnv = new NettyRpcEnvFactory().create(config)
- numTry = 0
- } catch {
- case e: BindException =>
- // port is occupied, increase the port number and try again
- exception = e
- LOG.error(s"start search-service failed: ${e.getMessage}")
- port = port + 1
- numTry = numTry - 1
- }
- } while (numTry > 0)
- if (rpcEnv == null) {
- // we have tried many times, but still failed to find an available port
- throw exception
- }
- val searchEndpoint: RpcEndpoint = new Searcher(rpcEnv)
- rpcEnv.setupEndpoint("search-service", searchEndpoint)
- LOG.info("search-service started")
- rpcEnv.awaitTermination()
- }
- }).start()
- }
-
- private def registerToMaster(masterHostAddress: String, masterPort: Int): String = {
- LOG.info(s"trying to register to master $masterHostAddress:$masterPort")
- val conf = new SparkConf()
- val config = RpcEnvConfig(conf, "registry-client", masterHostAddress, "", masterPort,
- new SecurityManager(conf), clientMode = true)
- val rpcEnv: RpcEnv = new NettyRpcEnvFactory().create(config)
-
- val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(
- RpcAddress(masterHostAddress, masterPort), "registry-service")
- val cores = Runtime.getRuntime.availableProcessors()
-
- val request = RegisterWorkerRequest(hostAddress, port, cores)
- val future = endPointRef.ask[RegisterWorkerResponse](request)
- ThreadUtils.awaitResult(future, Duration.apply("10s"))
- future.value match {
- case Some(result) =>
- result match {
- case Success(response) =>
- LOG.info("worker registered")
- response.workerId
- case Failure(throwable) =>
- LOG.error(s"worker failed to registered: $throwable")
- throw new IOException(throwable.getMessage)
- }
- case None =>
- LOG.error("worker register timeout")
- throw new ExecutionTimeoutException
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/search/Registry.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Registry.scala b/store/search/src/main/scala/org/apache/spark/search/Registry.scala
deleted file mode 100644
index 22e766d..0000000
--- a/store/search/src/main/scala/org/apache/spark/search/Registry.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.search
-
-import org.apache.spark.rpc.{Master, RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-/**
- * Registry service implementation. It adds worker to master.
- */
-class Registry(override val rpcEnv: RpcEnv, master: Master) extends RpcEndpoint {
- private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- override def onStart(): Unit = {
- LOG.info("Registry Endpoint started")
- }
-
- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case req@RegisterWorkerRequest(_, _, _) =>
- val response = master.addWorker(req)
- context.reply(response)
- }
-
- override def onStop(): Unit = {
- LOG.info("Registry Endpoint stopped")
- }
-
-}
-
-case class RegisterWorkerRequest(
- hostAddress: String,
- port: Int,
- cores: Int)
-
-case class RegisterWorkerResponse(
- workerId: String)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
deleted file mode 100644
index 6fbea15..0000000
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.search
-
-import org.apache.spark.SerializableWritable
-import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
-import org.apache.carbondata.store.worker.SearchRequestHandler
-
-/**
- * Search service implementation
- */
-class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint {
- private val LOG = LogServiceFactory.getLogService(this.getClass.getName)
-
- override def onStart(): Unit = {
- LOG.info("Searcher Endpoint started")
- }
-
- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case req: SearchRequest =>
- val response = new SearchRequestHandler().handleSearch(req)
- context.reply(response)
-
- case req: ShutdownRequest =>
- val response = new SearchRequestHandler().handleShutdown(req)
- context.reply(response)
-
- }
-
- override def onStop(): Unit = {
- LOG.info("Searcher Endpoint stopped")
- }
-}
-
-// Search request sent from master to worker
-case class SearchRequest(
- searchId: Int,
- split: SerializableWritable[CarbonMultiBlockSplit],
- tableInfo: TableInfo,
- projectColumns: Array[String],
- filterExpression: Expression,
- limit: Long)
-
-// Search result sent from worker to master
-case class SearchResult(
- queryId: Int,
- status: Int,
- message: String,
- rows: Array[Array[Object]])
-
-// Shutdown request sent from master to worker
-case class ShutdownRequest(
- reason: String)
-
-// Shutdown response sent from worker to master
-case class ShutdownResponse(
- status: Int,
- message: String)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
----------------------------------------------------------------------
diff --git a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java b/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
deleted file mode 100644
index 88d925f..0000000
--- a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-public class SearchServiceTest {
-// @Test
-// public void testStartStopService() throws IOException, ExecutionException, InterruptedException {
-// Master master = new Master(9999);
-// master.startService();
-//
-// Worker worker = Worker.getInstance();
-// worker.init(InetAddress.getLocalHost().getHostName(), 9999);
-//
-// Set<String> workers = master.getWorkers();
-// Assert.assertEquals(1, workers.size());
-// Assert.assertEquals(InetAddress.getLocalHost().getHostName(), workers.toArray()[0]);
-//
-// master.stopAllWorkers();
-// master.stopService();
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
deleted file mode 100644
index 8780dc0..0000000
--- a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import scala.concurrent.Future
-import scala.reflect.ClassTag
-
-import org.apache.spark.SparkConf
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-class SchedulerSuite extends FunSuite with BeforeAndAfterEach {
-
- var scheduler: Scheduler = _
- var w1: Schedulable = _
- var w2: Schedulable = _
- var w3: Schedulable = _
-
- override def beforeEach(): Unit = {
- scheduler = new Scheduler()
- w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef())
- w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef())
- w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef())
-
- scheduler.addWorker("1.1.1.1", w1)
- scheduler.addWorker("1.1.1.2", w2)
- scheduler.addWorker("1.1.1.3", w3)
- }
-
- test("test addWorker, removeWorker, getAllWorkers") {
- assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
- scheduler.removeWorker("1.1.1.2")
- assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
- val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef())
- scheduler.addWorker("1.1.1.4", w4)
- assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet)
- assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id)
- }
-
- test("test normal schedule") {
- val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
- assertResult(w1.id)(r1.id)
- val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- assertResult(w2.id)(r2.id)
- val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- assertResult(w3.id)(r3.id)
- val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null)
- assertResult(w1.id)(r4.id)
- val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- assertResult(w2.id)(r5.id)
- val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- assertResult(w3.id)(r6.id)
- }
-
- test("test worker unavailable") {
- val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null)
- assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id))
- }
-
- test("test reschedule when target worker is overload") {
- // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
- (1 to 40).foreach { i =>
- val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- }
- val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- // it must be worker1 since worker3 exceed max workload
- assertResult(w1.id)(r.id)
- }
-
- test("test all workers are overload") {
- // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
- (1 to 40).foreach { i =>
- val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
- val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- }
-
- val e = intercept[WorkerTooBusyException] {
- scheduler.sendRequestAsync("1.1.1.3", null)
- }
- }
-
- test("test user configured overload param") {
- val original = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-
- CarbonProperties.getInstance().addProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3")
-
- (1 to 3).foreach { i =>
- val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
- val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
- val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
- }
-
- val e = intercept[WorkerTooBusyException] {
- scheduler.sendRequestAsync("1.1.1.3", null)
- }
-
- if (original != null) {
- CarbonProperties.getInstance().addProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original)
- }
- }
-
- test("test invalid property") {
- intercept[IllegalArgumentException] {
- CarbonProperties.getInstance().addProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3")
- }
- var value = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
- assertResult(null)(value)
-
- intercept[NumberFormatException] {
- CarbonProperties.getInstance().addProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s")
- }
- value = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
- assertResult(null)(value)
- }
-}
-
-class DummyRef extends RpcEndpointRef(new SparkConf) {
- override def address: RpcAddress = null
-
- override def name: String = ""
-
- override def send(message: Any): Unit = { }
-
- override def ask[T](message: Any, timeout: RpcTimeout)
- (implicit evidence$1: ClassTag[T]): Future[T] = null
-}
\ No newline at end of file
[3/3] carbondata git commit: [CARBONDATA-2609] Change RPC
implementation to Hadoop RPC framework
Posted by qi...@apache.org.
[CARBONDATA-2609] Change RPC implementation to Hadoop RPC framework
This closes #2372
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fa111380
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fa111380
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fa111380
Branch: refs/heads/carbonstore
Commit: fa111380fc46c955330bc47b802844e576b6524f
Parents: d5e86db
Author: Jacky Li <ja...@qq.com>
Authored: Wed Jun 13 23:57:00 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Mon Jun 25 09:46:17 2018 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 4 +-
.../carbondata/core/scan/model/QueryModel.java | 14 +-
.../carbondata/core/util/CarbonProperties.java | 10 +
.../core/util/ObjectSerializationUtil.java | 14 +
.../carbondata/hadoop/CarbonRecordReader.java | 8 +-
.../detailquery/SearchModeTestCase.scala | 17 +-
integration/spark2/pom.xml | 2 +-
.../carbondata/store/SparkCarbonStore.scala | 27 +-
.../org/apache/spark/sql/CarbonSession.scala | 1 +
pom.xml | 2 +-
store/core/pom.xml | 113 +++++++
.../carbondata/store/CarbonRowReadSupport.java | 53 ++++
.../apache/carbondata/store/CarbonStore.java | 68 +++++
.../carbondata/store/LocalCarbonStore.java | 130 +++++++++
.../carbondata/store/MetaCachedCarbonStore.java | 59 ++++
.../carbondata/store/rpc/QueryService.java | 33 +++
.../carbondata/store/rpc/RegistryService.java | 30 ++
.../carbondata/store/rpc/ServiceFactory.java | 43 +++
.../store/rpc/impl/IndexedRecordReader.java | 161 ++++++++++
.../store/rpc/impl/QueryServiceImpl.java | 56 ++++
.../store/rpc/impl/RegistryServiceImpl.java | 54 ++++
.../store/rpc/impl/RequestHandler.java | 147 ++++++++++
.../carbondata/store/rpc/impl/Status.java | 28 ++
.../store/rpc/model/QueryRequest.java | 108 +++++++
.../store/rpc/model/QueryResponse.java | 84 ++++++
.../store/rpc/model/RegisterWorkerRequest.java | 69 +++++
.../store/rpc/model/RegisterWorkerResponse.java | 54 ++++
.../store/rpc/model/ShutdownRequest.java | 53 ++++
.../store/rpc/model/ShutdownResponse.java | 61 ++++
.../org/apache/carbondata/store/Master.scala | 283 ++++++++++++++++++
.../org/apache/carbondata/store/Scheduler.scala | 147 ++++++++++
.../org/apache/carbondata/store/Worker.scala | 113 +++++++
.../carbondata/store/LocalCarbonStoreTest.java | 72 +++++
.../org/apache/carbondata/store/TestUtil.java | 168 +++++++++++
.../carbondata/store/SchedulerSuite.scala | 155 ++++++++++
.../carbondata/store/CarbonRowReadSupport.java | 53 ----
.../apache/carbondata/store/CarbonStore.java | 68 -----
.../carbondata/store/LocalCarbonStore.java | 130 ---------
.../carbondata/store/MetaCachedCarbonStore.java | 59 ----
.../carbondata/store/LocalCarbonStoreTest.java | 72 -----
store/search/pom.xml | 112 -------
.../store/worker/SearchRequestHandler.java | 247 ----------------
.../apache/carbondata/store/worker/Status.java | 28 --
.../scala/org/apache/spark/rpc/Master.scala | 291 -------------------
.../scala/org/apache/spark/rpc/Scheduler.scala | 139 ---------
.../scala/org/apache/spark/rpc/Worker.scala | 118 --------
.../org/apache/spark/search/Registry.scala | 51 ----
.../org/apache/spark/search/Searcher.scala | 79 -----
.../carbondata/store/SearchServiceTest.java | 37 ---
.../org/apache/spark/rpc/SchedulerSuite.scala | 154 ----------
50 files changed, 2402 insertions(+), 1677 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 118ff28..ff6b358 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1725,7 +1725,7 @@ public final class CarbonCommonConstants {
public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";
/**
- * It's timeout threshold of carbon search query
+ * It's timeout threshold of carbon search query, in seconds
*/
@CarbonProperty
@InterfaceStability.Unstable
@@ -1734,7 +1734,7 @@ public final class CarbonCommonConstants {
/**
* Default value is 10 seconds
*/
- public static final String CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = "10s";
+ public static final int CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = 10;
/**
* The size of thread pool used for reading files in Work for search mode. By default,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 55dafb9..b15ce02 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -69,6 +69,7 @@ public class QueryModel {
* table block information in which query will be executed
*/
private List<TableBlockInfo> tableBlockInfos;
+
/**
* To handle most of the computation in query engines like spark and hive, carbon should give
* raw detailed records to it.
@@ -109,11 +110,6 @@ public class QueryModel {
*/
private boolean requiredRowId;
- /**
- * whether it is FG with search mode
- */
- private boolean isFG;
-
private QueryModel(CarbonTable carbonTable) {
tableBlockInfos = new ArrayList<TableBlockInfo>();
invalidSegmentIds = new ArrayList<>();
@@ -375,14 +371,6 @@ public class QueryModel {
this.requiredRowId = requiredRowId;
}
- public boolean isFG() {
- return isFG;
- }
-
- public void setFG(boolean FG) {
- isFG = FG;
- }
-
@Override
public String toString() {
return String.format("scan on table %s.%s, %d projection columns with filter (%s)",
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index dc50ab0..574d175 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1599,4 +1599,14 @@ public final class CarbonProperties {
}
return storageLevel.toUpperCase();
}
+
+ public int getQueryTimeout() {
+ try {
+ return Integer.parseInt(
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT));
+ } catch (NumberFormatException e) {
+ return CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
index 020787d..e133208 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
@@ -113,4 +113,18 @@ public class ObjectSerializationUtil {
}
}
+ public static byte[] serialize(Object object) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(object);
+ return baos.toByteArray();
+ }
+
+ public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+ if (bytes == null) {
+ return null;
+ }
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+ return ois.readObject();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 6b56382..3a0037f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -78,12 +78,8 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
} else {
throw new RuntimeException("unsupported input split type: " + inputSplit);
}
- // It should use the exists tableBlockInfos if tableBlockInfos of queryModel is not empty
- // otherwise the prune is no use before this method
- if (!queryModel.isFG()) {
- List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
- queryModel.setTableBlockInfos(tableBlockInfoList);
- }
+ List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+ queryModel.setTableBlockInfos(tableBlockInfoList);
readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
try {
carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 001f6c0..af9e50f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -60,6 +60,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
}
test("SearchMode Query: row result") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
checkSearchAnswer("select * from main where city = 'city3'")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
@@ -67,36 +68,44 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
}
test("SearchMode Query: vector result") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
checkSearchAnswer("select * from main where city = 'city3'")
}
test("equal filter") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
checkSearchAnswer("select id from main where id = '100'")
checkSearchAnswer("select id from main where planet = 'planet100'")
}
test("greater and less than filter") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
checkSearchAnswer("select id from main where m2 < 4")
}
test("IN filter") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
checkSearchAnswer("select id from main where id IN ('40', '50', '60')")
}
test("expression filter") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
checkSearchAnswer("select id from main where length(id) < 2")
}
test("filter with limit") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
checkSearchAnswer("select id from main where id = '3' limit 10")
checkSearchAnswer("select id from main where length(id) < 2 limit 10")
}
test("aggregate query") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
checkSearchAnswer("select city, sum(m1) from main where m2 < 10 group by city")
}
test("aggregate query with datamap and fallback to SparkSQL") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
sql("create datamap preagg on table main using 'preaggregate' as select city, count(*) from main group by city ")
checkSearchAnswer("select city, count(*) from main group by city")
sql("drop datamap preagg on table main").show()
@@ -108,10 +117,11 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
checkSearchAnswer("select id from main where id = '3' limit 10")
sql("set carbon.search.enabled = false")
assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
+ sql("set carbon.search.enabled = true")
}
test("test lucene datamap with search mode") {
- sql("set carbon.search.enabled = true")
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
sql("DROP DATAMAP IF EXISTS dm ON TABLE main")
sql("CREATE DATAMAP dm ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='id') ")
checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"),
@@ -120,6 +130,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
}
test("test lucene datamap with search mode 2") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
sql("drop datamap if exists dm3 ON TABLE main")
sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city') ")
checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"),
@@ -128,6 +139,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
}
test("test lucene datamap with search mode, two column") {
+ assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
sql("drop datamap if exists dm3 ON TABLE main")
sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city , id') ")
checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"),
@@ -137,7 +149,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
sql("DROP DATAMAP if exists dm3 ON TABLE main")
}
- test("start search mode twice") {
+ ignore("start search mode twice") {
sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
checkSearchAnswer("select id from main where id = '3' limit 10")
@@ -148,6 +160,5 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
checkSearchAnswer("select id from main where id = '3' limit 10")
- sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 9b9e71d..6b05800 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -41,7 +41,7 @@
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-store-sdk</artifactId>
+ <artifactId>carbondata-store-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index 3a6adea..d99081d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -23,7 +23,6 @@ import java.net.InetAddress
import scala.collection.JavaConverters._
import org.apache.spark.{CarbonInputMetrics, SparkConf}
-import org.apache.spark.rpc.{Master, Worker}
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.SparkSession
@@ -111,24 +110,26 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
def startSearchMode(): Unit = {
LOG.info("Starting search mode master")
- master = new Master(session.sparkContext.getConf)
+ master = new Master()
master.startService()
startAllWorkers()
}
def stopSearchMode(): Unit = {
- LOG.info("Shutting down all workers...")
- try {
- master.stopAllWorkers()
- LOG.info("All workers are shutted down")
- } catch {
- case e: Exception =>
- LOG.error(s"failed to shutdown worker: ${e.toString}")
+ if (master != null) {
+ LOG.info("Shutting down all workers...")
+ try {
+ master.stopAllWorkers()
+ LOG.info("All workers are shut down")
+ } catch {
+ case e: Exception =>
+ LOG.error(s"failed to shutdown worker: ${ e.toString }")
+ }
+ LOG.info("Stopping master...")
+ master.stopService()
+ LOG.info("Master stopped")
+ master = null
}
- LOG.info("Stopping master...")
- master.stopService()
- LOG.info("Master stopped")
- master = null
}
/** search mode */
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 30cb464..7fdba89 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -214,6 +214,7 @@ class CarbonSession(@transient val sc: SparkContext,
case e: RuntimeException =>
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
.error(s"Stop search mode failed: ${e.getMessage}")
+ throw e
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1413fd1..5e81f2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
<module>integration/spark-common-test</module>
<module>datamap/examples</module>
<module>store/sdk</module>
- <module>store/search</module>
+ <module>store/core</module>
<module>assembly</module>
</modules>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/pom.xml
----------------------------------------------------------------------
diff --git a/store/core/pom.xml b/store/core/pom.xml
new file mode 100644
index 0000000..0bee84f
--- /dev/null
+++ b/store/core/pom.xml
@@ -0,0 +1,113 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>carbondata-store-core</artifactId>
+ <name>Apache CarbonData :: Store Core </name>
+
+ <properties>
+ <dev.path>${basedir}/../../dev</dev.path>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-store-sdk</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <!-- Note config is repeated in surefire config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>CarbonTestSuite.txt</filereports>
+ <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ </argLine>
+ <stderr />
+ <environmentVariables>
+ </environmentVariables>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+ </systemProperties>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java b/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
new file mode 100644
index 0000000..bafbb9f
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+
+/**
+ * ReadSupport that convert row object to CarbonRow
+ */
+@InterfaceAudience.Internal
+public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> {
+ private CarbonReadSupport<Object[]> delegate;
+
+ public CarbonRowReadSupport() {
+ this.delegate = new DictionaryDecodeReadSupport<>();
+ }
+
+ @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable)
+ throws IOException {
+ delegate.initialize(carbonColumns, carbonTable);
+ }
+
+ @Override public CarbonRow readRow(Object[] data) {
+ Object[] converted = delegate.readRow(data);
+ return new CarbonRow(converted);
+ }
+
+ @Override public void close() {
+ delegate.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java
new file mode 100644
index 0000000..c6b2fb8
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.scan.expression.Expression;
+
+/**
+ * User can use {@link CarbonStore} to query data
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface CarbonStore extends Closeable {
+
+ /**
+ * Scan query on the data in the table path
+ * @param path table path
+ * @param projectColumns column names to read
+ * @return rows
+ * @throws IOException if unable to read files in table path
+ */
+ Iterator<CarbonRow> scan(
+ String path,
+ String[] projectColumns) throws IOException;
+
+ /**
+ * Scan query with filter, on the data in the table path
+ * @param path table path
+ * @param projectColumns column names to read
+ * @param filter filter condition, can be null
+ * @return rows that satisfy filter condition
+ * @throws IOException if unable to read files in table path
+ */
+ Iterator<CarbonRow> scan(
+ String path,
+ String[] projectColumns,
+ Expression filter) throws IOException;
+
+ /**
+ * SQL query, table should be created before calling this function
+ * @param sqlString SQL statement
+ * @return rows
+ * @throws IOException if unable to read files in table path
+ */
+ Iterator<CarbonRow> sql(String sqlString) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
new file mode 100644
index 0000000..daa1447
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * A CarbonStore implementation that works locally, without other compute framework dependency.
+ * It can be used to read data in local disk.
+ *
+ * Note that this class is experimental, it is not intended to be used in production.
+ */
+@InterfaceAudience.Internal
+class LocalCarbonStore extends MetaCachedCarbonStore {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LocalCarbonStore.class.getName());
+
+ @Override
+ public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws IOException {
+ return scan(path, projectColumns, null);
+ }
+
+ @Override public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter)
+ throws IOException {
+ Objects.requireNonNull(path);
+ Objects.requireNonNull(projectColumns);
+
+ CarbonTable table = getTable(path);
+ if (table.isStreamingSink() || table.isHivePartitionTable()) {
+ throw new UnsupportedOperationException("streaming and partition table is not supported");
+ }
+ // TODO: use InputFormat to prune data and read data
+
+ final CarbonTableInputFormat format = new CarbonTableInputFormat();
+ final Job job = new Job(new Configuration());
+ CarbonInputFormat.setTableInfo(job.getConfiguration(), table.getTableInfo());
+ CarbonInputFormat.setTablePath(job.getConfiguration(), table.getTablePath());
+ CarbonInputFormat.setTableName(job.getConfiguration(), table.getTableName());
+ CarbonInputFormat.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
+ CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), CarbonRowReadSupport.class);
+ CarbonInputFormat
+ .setColumnProjection(job.getConfiguration(), new CarbonProjection(projectColumns));
+ if (filter != null) {
+ CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
+ }
+
+ final List<InputSplit> splits =
+ format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
+
+ List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size());
+
+ List<CarbonRow> rows = new ArrayList<>();
+
+ try {
+ for (InputSplit split : splits) {
+ TaskAttemptContextImpl attempt =
+ new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+ RecordReader reader = format.createRecordReader(split, attempt);
+ reader.initialize(split, attempt);
+ readers.add(reader);
+ }
+
+ for (RecordReader<Void, Object> reader : readers) {
+ while (reader.nextKeyValue()) {
+ rows.add((CarbonRow) reader.getCurrentValue());
+ }
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOGGER.error(e);
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ for (RecordReader<Void, Object> reader : readers) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOGGER.error(e);
+ }
+ }
+ }
+ return rows.iterator();
+ }
+
+ @Override
+ public Iterator<CarbonRow> sql(String sqlString) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
new file mode 100644
index 0000000..e43f750
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * A CarbonStore base class that caches CarbonTable object
+ */
+@InterfaceAudience.Internal
+abstract class MetaCachedCarbonStore implements CarbonStore {
+
+ // mapping of table path to CarbonTable object
+ private Map<String, CarbonTable> cache = new HashMap<>();
+
+ CarbonTable getTable(String path) throws IOException {
+ if (cache.containsKey(path)) {
+ return cache.get(path);
+ }
+ org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
+ .readSchemaFile(CarbonTablePath.getSchemaFilePath(path));
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
+ tableInfo1.setTablePath(path);
+ CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1);
+ cache.put(path, table);
+ return table;
+ }
+
+ @Override
+ public void close() throws IOException {
+ cache.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java
new file mode 100644
index 0000000..faaa746
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.rpc.model.ShutdownResponse;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+@InterfaceAudience.Internal
+public interface QueryService extends VersionedProtocol {
+ long versionID = 1L;
+ QueryResponse query(QueryRequest request);
+ ShutdownResponse shutdown(ShutdownRequest request);
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
new file mode 100644
index 0000000..4d17686
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+@InterfaceAudience.Internal
+public interface RegistryService extends VersionedProtocol {
+ long versionID = 1L;
+ RegisterWorkerResponse registerWorker(RegisterWorkerRequest request);
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
new file mode 100644
index 0000000..a50ab8b
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+
+@InterfaceAudience.Internal
+public class ServiceFactory {
+
+ public static QueryService createSearchService(String host, int port) throws IOException {
+ InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port);
+ return RPC.getProxy(
+ QueryService.class, QueryService.versionID, address, new Configuration());
+ }
+
+ public static RegistryService createRegistryService(String host, int port) throws IOException {
+ InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port);
+ return RPC.getProxy(
+ RegistryService.class, RegistryService.versionID, address, new Configuration());
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
new file mode 100644
index 0000000..2c768d1
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This is a special RecordReader that leverages FGDataMap before reading carbondata file
+ * and return CarbonRow object
+ */
+public class IndexedRecordReader extends CarbonRecordReader<CarbonRow> {
+
+ private static final LogService LOG =
+ LogServiceFactory.getLogService(RequestHandler.class.getName());
+
+ private int queryId;
+ private CarbonTable table;
+
+ public IndexedRecordReader(int queryId, CarbonTable table, QueryModel queryModel) {
+ super(queryModel, new CarbonRowReadSupport());
+ this.queryId = queryId;
+ this.table = table;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ CarbonMultiBlockSplit mbSplit = (CarbonMultiBlockSplit) inputSplit;
+ List<CarbonInputSplit> splits = mbSplit.getAllSplits();
+ List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
+ queryModel.setTableBlockInfos(list);
+
+ // prune the block with FGDataMap is there is one based on the filter condition
+ DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
+ queryModel.getFilterExpressionResolverTree());
+ if (fgDataMap != null) {
+ queryModel = prune(table, queryModel, mbSplit, fgDataMap);
+ } else {
+ List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splits);
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+ }
+
+ readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
+ try {
+ carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
+ } catch (QueryExecutionException e) {
+ throw new InterruptedException(e.getMessage());
+ }
+ }
+
+ private DataMapExprWrapper chooseFGDataMap(
+ CarbonTable table,
+ FilterResolverIntf filterInterface) {
+ DataMapChooser chooser = null;
+ try {
+ chooser = new DataMapChooser(table);
+ return chooser.chooseFGDataMap(filterInterface);
+ } catch (IOException e) {
+ LOG.error(e);
+ return null;
+ }
+ }
+
+ /**
+ * If there is FGDataMap defined for this table and filter condition in the query,
+ * prune the splits by the DataMap and set the pruned split into the QueryModel and return
+ */
+ private QueryModel prune(CarbonTable table, QueryModel queryModel,
+ CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException {
+ Objects.requireNonNull(datamap);
+ List<Segment> segments = new LinkedList<>();
+ HashMap<String, Integer> uniqueSegments = new HashMap<>();
+ LoadMetadataDetails[] loadMetadataDetails =
+ SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(table.getTablePath()));
+ for (CarbonInputSplit split : mbSplit.getAllSplits()) {
+ String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString();
+ if (uniqueSegments.get(segmentId) == null) {
+ segments.add(Segment.toSegment(segmentId,
+ new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
+ loadMetadataDetails)));
+ uniqueSegments.put(segmentId, 1);
+ } else {
+ uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
+ }
+ }
+
+ List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments);
+ List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>();
+ for (int i = 0; i < distributables.size(); i++) {
+ DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable();
+ prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
+ }
+
+ HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
+ for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
+ pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet);
+ }
+
+ List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
+ List<TableBlockInfo> blockToRead = new LinkedList<>();
+ for (TableBlockInfo block : blocks) {
+ if (pathToRead.keySet().contains(block.getFilePath())) {
+ // If not set this, it won't create FineGrainBlocklet object in
+ // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
+ block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
+ blockToRead.add(block);
+ }
+ }
+ LOG.info(String.format("[QueryId:%d] pruned using FG DataMap, pruned blocks: %d", queryId,
+ blockToRead.size()));
+ queryModel.setTableBlockInfos(blockToRead);
+ return queryModel;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
new file mode 100644
index 0000000..b191331
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.rpc.QueryService;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.rpc.model.ShutdownResponse;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+
+@InterfaceAudience.Internal
+public class QueryServiceImpl implements QueryService {
+
+ @Override
+ public QueryResponse query(QueryRequest request) {
+ RequestHandler handler = new RequestHandler();
+ return handler.handleSearch(request);
+ }
+
+ @Override
+ public ShutdownResponse shutdown(ShutdownRequest request) {
+ RequestHandler handler = new RequestHandler();
+ return handler.handleShutdown(request);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return versionID;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
new file mode 100644
index 0000000..12f48ba
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.Master;
+import org.apache.carbondata.store.rpc.RegistryService;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+
+@InterfaceAudience.Internal
+public class RegistryServiceImpl implements RegistryService {
+
+ private Master master;
+
+ public RegistryServiceImpl(Master master) {
+ this.master = master;
+ }
+
+ @Override
+ public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) {
+ return master.addWorker(request);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return versionID;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
new file mode 100644
index 0000000..29ee546
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.core.util.CarbonTaskInfo;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.rpc.model.ShutdownResponse;
+
+/**
+ * It handles request from master.
+ */
+@InterfaceAudience.Internal
+class RequestHandler {
+
+ private static final LogService LOG =
+ LogServiceFactory.getLogService(RequestHandler.class.getName());
+
+ QueryResponse handleSearch(QueryRequest request) {
+ try {
+ LOG.info(String.format("[QueryId:%d] receive search request", request.getRequestId()));
+ List<CarbonRow> rows = handleRequest(request);
+ LOG.info(String.format("[QueryId:%d] sending success response", request.getRequestId()));
+ return createSuccessResponse(request, rows);
+ } catch (IOException e) {
+ LOG.error(e);
+ LOG.info(String.format("[QueryId:%d] sending failure response", request.getRequestId()));
+ return createFailureResponse(request, e);
+ }
+ }
+
+ ShutdownResponse handleShutdown(ShutdownRequest request) {
+ LOG.info("Shutting down worker...");
+ SearchModeDetailQueryExecutor.shutdownThreadPool();
+ SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
+ LOG.info("Worker shut down");
+ return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
+ }
+
+ /**
+ * Builds {@link QueryModel} and read data from files
+ */
+ private List<CarbonRow> handleRequest(QueryRequest request) throws IOException {
+ CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
+ carbonTaskInfo.setTaskId(System.nanoTime());
+ ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
+ CarbonMultiBlockSplit mbSplit = request.getSplit();
+ long limit = request.getLimit();
+ TableInfo tableInfo = request.getTableInfo();
+ CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
+ QueryModel queryModel = createQueryModel(table, request);
+
+ LOG.info(String.format("[QueryId:%d] %s, number of block: %d",
+ request.getRequestId(), queryModel.toString(), mbSplit.getAllSplits().size()));
+
+ // read all rows by the reader
+ List<CarbonRow> rows = new LinkedList<>();
+ try (CarbonRecordReader<CarbonRow> reader =
+ new IndexedRecordReader(request.getRequestId(), table, queryModel)) {
+ reader.initialize(mbSplit, null);
+
+ // loop to read required number of rows.
+ // By default, if user does not specify the limit value, limit is Long.MaxValue
+ long rowCount = 0;
+ while (reader.nextKeyValue() && rowCount < limit) {
+ rows.add(reader.getCurrentValue());
+ rowCount++;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ LOG.info(String.format("[QueryId:%d] scan completed, return %d rows",
+ request.getRequestId(), rows.size()));
+ return rows;
+ }
+
+
+
+ private QueryModel createQueryModel(CarbonTable table, QueryRequest request) {
+ String[] projectColumns = request.getProjectColumns();
+ Expression filter = null;
+ if (request.getFilterExpression() != null) {
+ filter = request.getFilterExpression();
+ }
+ return new QueryModelBuilder(table)
+ .projectColumns(projectColumns)
+ .filterExpression(filter)
+ .build();
+ }
+
+ /**
+ * create a failure response
+ */
+ private QueryResponse createFailureResponse(QueryRequest request, Throwable throwable) {
+ return new QueryResponse(request.getRequestId(), Status.FAILURE.ordinal(),
+ throwable.getMessage(), new Object[0][]);
+ }
+
+ /**
+ * create a success response with result rows
+ */
+ private QueryResponse createSuccessResponse(QueryRequest request, List<CarbonRow> rows) {
+ Iterator<CarbonRow> itor = rows.iterator();
+ Object[][] output = new Object[rows.size()][];
+ int i = 0;
+ while (itor.hasNext()) {
+ output[i++] = itor.next().getData();
+ }
+ return new QueryResponse(request.getRequestId(), Status.SUCCESS.ordinal(), "", output);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java
new file mode 100644
index 0000000..9bcd397
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Status of RPC response
+ */
+@InterfaceAudience.Internal
+public enum Status {
+ SUCCESS, FAILURE
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
new file mode 100644
index 0000000..27dc38b
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class QueryRequest implements Serializable, Writable {
+ private int requestId;
+ private CarbonMultiBlockSplit split;
+ private TableInfo tableInfo;
+ private String[] projectColumns;
+ private Expression filterExpression;
+ private long limit;
+
+ public QueryRequest() {
+ }
+
+ public QueryRequest(int requestId, CarbonMultiBlockSplit split,
+ TableInfo tableInfo, String[] projectColumns, Expression filterExpression, long limit) {
+ this.requestId = requestId;
+ this.split = split;
+ this.tableInfo = tableInfo;
+ this.projectColumns = projectColumns;
+ this.filterExpression = filterExpression;
+ this.limit = limit;
+ }
+
+ public int getRequestId() {
+ return requestId;
+ }
+
+ public CarbonMultiBlockSplit getSplit() {
+ return split;
+ }
+
+ public TableInfo getTableInfo() {
+ return tableInfo;
+ }
+
+ public String[] getProjectColumns() {
+ return projectColumns;
+ }
+
+ public Expression getFilterExpression() {
+ return filterExpression;
+ }
+
+ public long getLimit() {
+ return limit;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(requestId);
+ split.write(out);
+ tableInfo.write(out);
+ out.writeInt(projectColumns.length);
+ for (String projectColumn : projectColumns) {
+ out.writeUTF(projectColumn);
+ }
+ String filter = ObjectSerializationUtil.convertObjectToString(filterExpression);
+ out.writeUTF(filter);
+ out.writeLong(limit);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ requestId = in.readInt();
+ split = new CarbonMultiBlockSplit();
+ split.readFields(in);
+ tableInfo = new TableInfo();
+ tableInfo.readFields(in);
+ projectColumns = new String[in.readInt()];
+ for (int i = 0; i < projectColumns.length; i++) {
+ projectColumns[i] = in.readUTF();
+ }
+ String filter = in.readUTF();
+ filterExpression = (Expression) ObjectSerializationUtil.convertStringToObject(filter);
+ limit = in.readLong();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
new file mode 100644
index 0000000..033f1a5
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+@InterfaceAudience.Internal
+public class QueryResponse implements Serializable, Writable {
+ private int queryId;
+ private int status;
+ private String message;
+ private Object[][] rows;
+
+ public QueryResponse() {
+ }
+
+ public QueryResponse(int queryId, int status, String message, Object[][] rows) {
+ this.queryId = queryId;
+ this.status = status;
+ this.message = message;
+ this.rows = rows;
+ }
+
+ public int getQueryId() {
+ return queryId;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public Object[][] getRows() {
+ return rows;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(queryId);
+ out.writeInt(status);
+ out.writeUTF(message);
+ WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(rows));
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ queryId = in.readInt();
+ status = in.readInt();
+ message = in.readUTF();
+ try {
+ rows = (Object[][])ObjectSerializationUtil.deserialize(
+ WritableUtils.readCompressedByteArray(in));
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
new file mode 100644
index 0000000..894948b
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class RegisterWorkerRequest implements Serializable, Writable {
+ private String hostAddress;
+ private int port;
+ private int cores;
+
+ public RegisterWorkerRequest() {
+ }
+
+ public RegisterWorkerRequest(String hostAddress, int port, int cores) {
+ this.hostAddress = hostAddress;
+ this.port = port;
+ this.cores = cores;
+ }
+
+ public String getHostAddress() {
+ return hostAddress;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public int getCores() {
+ return cores;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(hostAddress);
+ out.writeInt(port);
+ out.writeInt(cores);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ hostAddress = in.readUTF();
+ port = in.readInt();
+ cores = in.readInt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
new file mode 100644
index 0000000..8465c90
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class RegisterWorkerResponse implements Serializable, Writable {
+
+ private String workerId;
+
+ public RegisterWorkerResponse() {
+ }
+
+ public RegisterWorkerResponse(String workerId) {
+ this.workerId = workerId;
+ }
+
+ public String getWorkerId() {
+ return workerId;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(workerId);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ workerId = in.readUTF();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
new file mode 100644
index 0000000..7a25944
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class ShutdownRequest implements Serializable, Writable {
+ private String reason;
+
+ public ShutdownRequest() {
+ }
+
+ public ShutdownRequest(String reason) {
+ this.reason = reason;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(reason);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ reason = in.readUTF();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
new file mode 100644
index 0000000..f6f329f
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class ShutdownResponse implements Serializable, Writable {
+ private int status;
+ private String message;
+
+ public ShutdownResponse() {
+ }
+
+ public ShutdownResponse(int status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(status);
+ out.writeUTF(message);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ status = in.readInt();
+ message = in.readUTF();
+ }
+}
[2/3] carbondata git commit: [CARBONDATA-2609] Change RPC
implementation to Hadoop RPC framework
Posted by qi...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/scala/org/apache/carbondata/store/Master.scala
----------------------------------------------------------------------
diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Master.scala b/store/core/src/main/scala/org/apache/carbondata/store/Master.scala
new file mode 100644
index 0000000..2109251
--- /dev/null
+++ b/store/core/src/main/scala/org/apache/carbondata/store/Master.scala
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.io.IOException
+import java.net.{BindException, InetAddress}
+import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
+import java.util.concurrent.{ExecutionException, Future, TimeoutException, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.ipc.RPC
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.block.Distributable
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
+import org.apache.carbondata.hadoop.api.CarbonInputFormat
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.store.rpc.{RegistryService, ServiceFactory}
+import org.apache.carbondata.store.rpc.impl.{RegistryServiceImpl, Status}
+import org.apache.carbondata.store.rpc.model._
+
+/**
+ * Master of CarbonSearch.
+ * It provides a Registry service for worker to register.
+ * And it provides search API to fire RPC call to workers.
+ */
+@InterfaceAudience.Internal
+private[store] class Master {
+ private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ // worker host address map to EndpointRef
+
+ private val random = new Random
+
+ private var registryServer: RPC.Server = _
+
+ private val scheduler: Scheduler = new Scheduler
+
+ def buildServer(serverHost: String, serverPort: Int): RPC.Server = {
+ val hadoopConf = FileFactory.getConfiguration
+ val builder = new RPC.Builder(hadoopConf)
+ builder
+ .setBindAddress(serverHost)
+ .setPort(serverPort)
+ .setProtocol(classOf[RegistryService])
+ .setInstance(new RegistryServiceImpl(this))
+ .build
+ }
+
+ /** start service and listen on port passed in constructor */
+ def startService(): Unit = {
+ if (registryServer == null) {
+ LOG.info("Start search mode master thread")
+ val isStarted: AtomicBoolean = new AtomicBoolean(false)
+ new Thread(new Runnable {
+ override def run(): Unit = {
+ val hostAddress = InetAddress.getLocalHost.getHostAddress
+ var port = CarbonProperties.getSearchMasterPort
+ var exception: BindException = null
+ var numTry = 100 // we will try to create service at worse case 100 times
+ do {
+ try {
+ LOG.info(s"building registry-service on $hostAddress:$port")
+ registryServer = buildServer(hostAddress, port)
+ numTry = 0
+ } catch {
+ case e: BindException =>
+ // port is occupied, increase the port number and try again
+ exception = e
+ LOG.error(s"start registry-service failed: ${e.getMessage}")
+ port = port + 1
+ numTry = numTry - 1
+ }
+ } while (numTry > 0)
+ if (registryServer == null) {
+ // we have tried many times, but still failed to find an available port
+ throw exception
+ }
+ if (isStarted.compareAndSet(false, false)) {
+ synchronized {
+ isStarted.compareAndSet(false, true)
+ }
+ }
+ LOG.info("starting registry-service")
+ registryServer.start()
+ LOG.info("registry-service started")
+ }
+ }).start()
+ var count = 0
+ val countThreshold = 5000
+ while (isStarted.compareAndSet(false, false) && count < countThreshold) {
+ LOG.info(s"Waiting search mode master to start, retrying $count times")
+ Thread.sleep(10)
+ count = count + 1
+ }
+ if (count >= countThreshold) {
+ LOG.error(s"Search mode try $countThreshold times to start master but failed")
+ throw new RuntimeException(
+ s"Search mode try $countThreshold times to start master but failed")
+ } else {
+ LOG.info("Search mode master started")
+ }
+ } else {
+ LOG.info("Search mode master has already started")
+ }
+ }
+
+ def stopService(): Unit = {
+ if (registryServer != null) {
+ registryServer.stop()
+ registryServer.join()
+ registryServer = null
+ }
+ }
+
+ def stopAllWorkers(): Unit = {
+ scheduler.getAllWorkers.toSeq.foreach { case (address, schedulable) =>
+ val response = try {
+ schedulable.service.shutdown(new ShutdownRequest("user"))
+ } catch {
+ case throwable: Throwable =>
+ throw new IOException(throwable)
+ }
+ scheduler.removeWorker(address)
+ }
+ }
+
+ /** A new searcher is trying to register, add it to the map and connect to this searcher */
+ def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = {
+ LOG.info(s"Receive Register request from worker ${request.getHostAddress}:${request.getPort} " +
+ s"with ${request.getCores} cores")
+ val workerId = UUID.randomUUID().toString
+ val workerAddress = request.getHostAddress
+ val workerPort = request.getPort
+ LOG.info(s"connecting to worker ${request.getHostAddress}:${request.getPort}, " +
+ s"workerId $workerId")
+
+ val searchService = ServiceFactory.createSearchService(workerAddress, workerPort)
+ scheduler.addWorker(workerAddress,
+ new Schedulable(workerId, workerAddress, workerPort, request.getCores, searchService))
+ LOG.info(s"worker ${request.getHostAddress}:${request.getPort} registered")
+ new RegisterWorkerResponse(workerId)
+ }
+
+ /**
+ * Execute search by firing RPC call to worker, return the result rows
+ * @param table table to search
+ * @param columns projection column names
+ * @param filter filter expression
+ * @param globalLimit max number of rows required in Master
+ * @param localLimit max number of rows required in Worker
+ * @return
+ */
+ def search(table: CarbonTable, columns: Array[String], filter: Expression,
+ globalLimit: Long, localLimit: Long): Array[CarbonRow] = {
+ Objects.requireNonNull(table)
+ Objects.requireNonNull(columns)
+ if (globalLimit < 0 || localLimit < 0) {
+ throw new IllegalArgumentException("limit should be positive")
+ }
+
+ val queryId = random.nextInt
+ var rowCount = 0
+ val output = new ArrayBuffer[CarbonRow]
+
+ def onSuccess(result: QueryResponse): Unit = {
+ // in case of RPC success, collect all rows in response message
+ if (result.getQueryId != queryId) {
+ throw new IOException(
+ s"queryId in response does not match request: ${result.getQueryId} != $queryId")
+ }
+ if (result.getStatus != Status.SUCCESS.ordinal()) {
+ throw new IOException(s"failure in worker: ${ result.getMessage }")
+ }
+
+ val itor = result.getRows.iterator
+ while (itor.hasNext && rowCount < globalLimit) {
+ output += new CarbonRow(itor.next())
+ rowCount = rowCount + 1
+ }
+ LOG.info(s"[QueryId:$queryId] accumulated result size $rowCount")
+ }
+ def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }")
+ def onTimedout() = throw new ExecutionTimeoutException()
+
+ // prune data and get a mapping of worker hostname to list of blocks,
+ // then add these blocks to the QueryRequest and fire the RPC call
+ val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
+ val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
+ // Build a QueryRequest
+ val split = new CarbonMultiBlockSplit(blocks, splitAddress)
+ val request =
+ new QueryRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
+
+ // Find an Endpoind and send the request to it
+ // This RPC is non-blocking so that we do not need to wait before send to next worker
+ scheduler.sendRequestAsync(splitAddress, request)
+ }
+
+ // loop to get the result of each Worker
+ tuple.foreach { case (worker: Schedulable, future: Future[QueryResponse]) =>
+
+ // if we have enough data already, we do not need to collect more result
+ if (rowCount < globalLimit) {
+ // wait for worker
+ val response = try {
+ future.get(CarbonProperties.getInstance().getQueryTimeout.toLong, TimeUnit.SECONDS)
+ } catch {
+ case e: ExecutionException => onFaiure(e)
+ case t: TimeoutException => onTimedout()
+ } finally {
+ worker.workload.decrementAndGet()
+ }
+ LOG.info(s"[QueryId:$queryId] receive search response from worker " +
+ s"${worker.address}:${worker.port}")
+ onSuccess(response)
+ }
+ }
+ output.toArray
+ }
+
+ /**
+ * Prune data by using CarbonInputFormat.getSplit
+ * Return a mapping of host address to list of block
+ */
+ private def pruneBlock(
+ table: CarbonTable,
+ columns: Array[String],
+ filter: Expression): JMap[String, JList[Distributable]] = {
+ val jobConf = new JobConf(new Configuration)
+ val job = new Job(jobConf)
+ val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
+ job, table, columns, filter, null, null)
+
+ // We will do FG pruning in reader side, so don't do it here
+ CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false)
+ val splits = format.getSplits(job)
+ val distributables = splits.asScala.map { split =>
+ split.asInstanceOf[Distributable]
+ }
+ CarbonLoaderUtil.nodeBlockMapping(
+ distributables.asJava,
+ -1,
+ getWorkers.asJava,
+ CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST,
+ null)
+ }
+
+ /** return hostname of all workers */
+ def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq
+}
+
+// Exception if execution timed out in search mode
+class ExecutionTimeoutException extends RuntimeException
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala
----------------------------------------------------------------------
diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala b/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala
new file mode 100644
index 0000000..fb3ef86
--- /dev/null
+++ b/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.io.IOException
+import java.util.concurrent.{Callable, Executors, Future}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.store.rpc.QueryService
+import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse}
+
+/**
+ * [[Master]] uses Scheduler to pick a Worker to send request
+ */
+@InterfaceAudience.Internal
+private[store] class Scheduler {
+ // mapping of worker IP address to worker instance
+ private val workers = mutable.Map[String, Schedulable]()
+ private val random = new Random()
+ private val executors = Executors.newCachedThreadPool()
+ private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * Pick a Worker according to the address and workload of the Worker
+ * Invoke the RPC and return Future result
+ */
+ def sendRequestAsync(
+ splitAddress: String,
+ request: QueryRequest): (Schedulable, Future[QueryResponse]) = {
+ require(splitAddress != null)
+ if (workers.isEmpty) {
+ throw new IOException("No worker is available")
+ }
+ var worker: Schedulable = pickWorker(splitAddress)
+
+ // check whether worker exceed max workload, if exceeded, pick next worker
+ val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores)
+ var numTry = workers.size
+ do {
+ if (worker.workload.get() >= maxWorkload) {
+ LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...")
+ worker = pickNextWorker(worker)
+ numTry = numTry - 1
+ } else {
+ numTry = -1
+ }
+ } while (numTry > 0)
+ if (numTry == 0) {
+ // tried so many times and still not able to find Worker
+ throw new WorkerTooBusyException(
+ s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload")
+ }
+ LOG.info(s"sending search request to worker ${worker.address}:${worker.port}")
+ val future = executors.submit(
+ new Callable[QueryResponse] {
+ override def call(): QueryResponse = worker.service.query(request)
+ }
+ )
+ worker.workload.incrementAndGet()
+ (worker, future)
+ }
+
+ private def pickWorker[T: ClassTag](splitAddress: String) = {
+ try {
+ workers(splitAddress)
+ } catch {
+ case e: NoSuchElementException =>
+ // no local worker available, choose one worker randomly
+ pickRandomWorker()
+ }
+ }
+
+ /** pick a worker randomly */
+ private def pickRandomWorker() = {
+ val index = random.nextInt(workers.size)
+ workers.toSeq(index)._2
+ }
+
+ /** pick the next worker of the input worker in the [[Scheduler.workers]] */
+ private def pickNextWorker(worker: Schedulable) = {
+ val index = workers.zipWithIndex.find { case ((address, w), index) =>
+ w == worker
+ }.get._2
+ if (index == workers.size - 1) {
+ workers.toSeq.head._2
+ } else {
+ workers.toSeq(index + 1)._2
+ }
+ }
+
+ /** A new searcher is trying to register, add it to the map and connect to this searcher */
+ def addWorker(address: String, schedulable: Schedulable): Unit = {
+ require(schedulable != null)
+ require(address.equals(schedulable.address))
+ workers(address) = schedulable
+ }
+
+ def removeWorker(address: String): Unit = {
+ workers.remove(address)
+ }
+
+ def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator
+}
+
+/**
+ * Represent a Worker which [[Scheduler]] can send
+ * Search request on it
+ * @param id Worker ID, a UUID string
+ * @param cores, number of cores in Worker
+ * @param service RPC service reference
+ * @param workload number of outstanding request sent to Worker
+ */
+private[store] class Schedulable(
+ val id: String,
+ val address: String,
+ val port: Int,
+ val cores: Int,
+ val service: QueryService,
+ var workload: AtomicInteger) {
+ def this(id: String, address: String, port: Int, cores: Int, service: QueryService) = {
+ this(id, address, port, cores, service, new AtomicInteger())
+ }
+}
+
+class WorkerTooBusyException(message: String) extends RuntimeException(message)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala
----------------------------------------------------------------------
diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala b/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala
new file mode 100644
index 0000000..2ded00b
--- /dev/null
+++ b/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.io.IOException
+import java.net.{BindException, InetAddress}
+
+import org.apache.hadoop.ipc.RPC
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.store.rpc.{QueryService, RegistryService, ServiceFactory}
+import org.apache.carbondata.store.rpc.impl.QueryServiceImpl
+import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest
+
+@InterfaceAudience.Internal
+private[store] object Worker {
+ private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ private val hostAddress = InetAddress.getLocalHost.getHostAddress
+ private var port: Int = _
+ private var registry: RegistryService = _
+
+ def init(masterHostAddress: String, masterPort: Int): Unit = {
+ LOG.info(s"initializing worker...")
+ startService()
+ LOG.info(s"registering to master $masterHostAddress:$masterPort")
+ val workerId = registerToMaster(masterHostAddress, masterPort)
+ LOG.info(s"worker registered to master, workerId: $workerId")
+ }
+
+ def buildServer(serverHost: String, serverPort: Int): RPC.Server = {
+ val hadoopConf = FileFactory.getConfiguration
+ val builder = new RPC.Builder(hadoopConf)
+ builder
+ .setNumHandlers(Runtime.getRuntime.availableProcessors)
+ .setBindAddress(serverHost)
+ .setPort(serverPort)
+ .setProtocol(classOf[QueryService])
+ .setInstance(new QueryServiceImpl)
+ .build
+ }
+
+ /**
+ * Start to listen on port [[CarbonProperties.getSearchWorkerPort]]
+ */
+ private def startService(): Unit = {
+ new Thread(new Runnable {
+ override def run(): Unit = {
+ port = CarbonProperties.getSearchWorkerPort
+ var searchServer: RPC.Server = null
+ var exception: BindException = null
+ var numTry = 100 // we will try to create service at worse case 100 times
+ do {
+ try {
+ LOG.info(s"building search-service on $hostAddress:$port")
+ searchServer = buildServer(hostAddress, port)
+ numTry = 0
+ } catch {
+ case e: BindException =>
+ // port is occupied, increase the port number and try again
+ exception = e
+ LOG.error(s"start search-service failed: ${e.getMessage}")
+ port = port + 1
+ numTry = numTry - 1
+ }
+ } while (numTry > 0)
+ if (searchServer == null) {
+ // we have tried many times, but still failed to find an available port
+ throw exception
+ }
+ LOG.info("starting search-service")
+ searchServer.start()
+ LOG.info("search-service started")
+ }
+ }).start()
+ }
+
+ private def registerToMaster(registryHostAddress: String, registryPort: Int): String = {
+ LOG.info(s"trying to register to master $registryHostAddress:$registryPort")
+ if (registry == null) {
+ registry = ServiceFactory.createRegistryService(registryHostAddress, registryPort)
+ }
+ val cores = Runtime.getRuntime.availableProcessors()
+ val request = new RegisterWorkerRequest(hostAddress, port, cores)
+ val response = try {
+ registry.registerWorker(request)
+ } catch {
+ case throwable: Throwable =>
+ LOG.error(s"worker failed to registered: $throwable")
+ throw new IOException(throwable)
+ }
+
+ LOG.info("worker registered")
+ response.getWorkerId
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
new file mode 100644
index 0000000..c885a26
--- /dev/null
+++ b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.file.Schema;
+import org.apache.carbondata.sdk.file.TestUtil;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LocalCarbonStoreTest {
+ @Before
+ public void cleanFile() {
+ assert (TestUtil.cleanMdtFile());
+ }
+
+ @After
+ public void verifyDMFile() {
+ assert (!TestUtil.verifyMdtFile());
+ }
+
+ // TODO: complete this testcase
+ // Currently result rows are empty, because SDK is not writing table status file
+ // so that reader does not find any segment.
+ // Complete this testcase after flat folder reader is done.
+ @Test
+ public void testWriteAndReadFiles() throws IOException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
+
+ CarbonStore store = new LocalCarbonStore();
+ Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, null);
+
+ while (rows.hasNext()) {
+ CarbonRow row = rows.next();
+ System.out.println(row.toString());
+ }
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
new file mode 100644
index 0000000..9b9aa9e
--- /dev/null
+++ b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.junit.Assert;
+
+public class TestUtil {
+
+ static void writeFilesAndVerify(Schema schema, String path) {
+ writeFilesAndVerify(schema, path, null);
+ }
+
+ static void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
+ writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1, true);
+ }
+
+ public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema) {
+ writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, true);
+ }
+
+ public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema,
+ boolean isTransactionalTable) {
+ writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, isTransactionalTable);
+ }
+
+ /**
+ * write file and verify
+ *
+ * @param rows number of rows
+ * @param schema schema
+ * @param path table store path
+ * @param persistSchema whether persist schema
+ * @param isTransactionalTable whether is transactional table
+ */
+ public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema,
+ boolean isTransactionalTable) {
+ writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, isTransactionalTable);
+ }
+
+ /**
+ * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
+ * @param rows number of rows to write
+ * @param schema schema of the file
+ * @param path local write path
+ * @param sortColumns sort columns
+ * @param persistSchema true if want to persist schema file
+ * @param blockletSize blockletSize in the file, -1 for default size
+ * @param blockSize blockSize in the file, -1 for default size
+ * @param isTransactionalTable set to true if this is written for Transactional Table.
+ */
+ static void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
+ boolean persistSchema, int blockletSize, int blockSize, boolean isTransactionalTable) {
+ try {
+ CarbonWriterBuilder builder = CarbonWriter.builder()
+ .isTransactionalTable(isTransactionalTable)
+ .outputPath(path);
+ if (sortColumns != null) {
+ builder = builder.sortBy(sortColumns);
+ }
+ if (persistSchema) {
+ builder = builder.persistSchemaFile(true);
+ }
+ if (blockletSize != -1) {
+ builder = builder.withBlockletSize(blockletSize);
+ }
+ if (blockSize != -1) {
+ builder = builder.withBlockSize(blockSize);
+ }
+
+ CarbonWriter writer = builder.buildWriterForCSVInput(schema);
+
+ for (int i = 0; i < rows; i++) {
+ writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
+ }
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } catch (InvalidLoadOptionException l) {
+ l.printStackTrace();
+ Assert.fail(l.getMessage());
+ }
+
+ File segmentFolder = null;
+ if (isTransactionalTable) {
+ segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+ Assert.assertTrue(segmentFolder.exists());
+ } else {
+ segmentFolder = new File(path);
+ Assert.assertTrue(segmentFolder.exists());
+ }
+
+ File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+ @Override public boolean accept(File pathname) {
+ return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+ }
+ });
+ Assert.assertNotNull(dataFiles);
+ Assert.assertTrue(dataFiles.length > 0);
+ }
+
+ /**
+ * verify whether the file exists
+ * if delete the file success or file not exists, then return true; otherwise return false
+ *
+ * @return boolean
+ */
+ public static boolean cleanMdtFile() {
+ String fileName = CarbonProperties.getInstance().getSystemFolderLocation()
+ + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile";
+ try {
+ if (FileFactory.isFileExist(fileName)) {
+ File file = new File(fileName);
+ file.delete();
+ return true;
+ } else {
+ return true;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ /**
+ * verify whether the mdt file exists
+ * if the file exists, then return true; otherwise return false
+ *
+ * @return boolean
+ */
+ public static boolean verifyMdtFile() {
+ String fileName = CarbonProperties.getInstance().getSystemFolderLocation()
+ + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile";
+ try {
+ if (FileFactory.isFileExist(fileName)) {
+ return true;
+ }
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException("IO exception:", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala b/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala
new file mode 100644
index 0000000..95e7335
--- /dev/null
+++ b/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import org.apache.hadoop.ipc.ProtocolSignature
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.store.rpc.QueryService
+import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse, ShutdownRequest, ShutdownResponse}
+
+class SchedulerSuite extends FunSuite with BeforeAndAfterEach {
+
+ var scheduler: Scheduler = _
+ var w1: Schedulable = _
+ var w2: Schedulable = _
+ var w3: Schedulable = _
+
+ override def beforeEach(): Unit = {
+ scheduler = new Scheduler()
+ w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef())
+ w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef())
+ w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef())
+
+ scheduler.addWorker("1.1.1.1", w1)
+ scheduler.addWorker("1.1.1.2", w2)
+ scheduler.addWorker("1.1.1.3", w3)
+ }
+
+ test("test addWorker, removeWorker, getAllWorkers") {
+ assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
+
+ scheduler.removeWorker("1.1.1.2")
+ assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
+
+ val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef())
+ scheduler.addWorker("1.1.1.4", w4)
+ assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet)
+ assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id)
+ }
+
+ test("test normal schedule") {
+ val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+ assertResult(w1.id)(r1.id)
+ val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ assertResult(w2.id)(r2.id)
+ val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ assertResult(w3.id)(r3.id)
+ val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+ assertResult(w1.id)(r4.id)
+ val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ assertResult(w2.id)(r5.id)
+ val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ assertResult(w3.id)(r6.id)
+ }
+
+ test("test worker unavailable") {
+ val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null)
+ assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id))
+ }
+
+ test("test reschedule when target worker is overload") {
+ // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
+ (1 to 40).foreach { i =>
+ val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+ val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ // it must be worker1 since worker3 exceed max workload
+ assertResult(w1.id)(r.id)
+ }
+
+ test("test all workers are overload") {
+ // by default, maxWorkload is number of core * 10, so it is 40 in this test suite
+ (1 to 40).foreach { i =>
+ val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+ val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+
+ val e = intercept[WorkerTooBusyException] {
+ scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+ }
+
+ test("test user configured overload param") {
+ val original = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3")
+
+ (1 to 3).foreach { i =>
+ val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
+ val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
+ val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+
+ val e = intercept[WorkerTooBusyException] {
+ scheduler.sendRequestAsync("1.1.1.3", null)
+ }
+
+ if (original != null) {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original)
+ }
+ }
+
+ test("test invalid property") {
+ intercept[IllegalArgumentException] {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3")
+ }
+ var value = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+ assertResult(null)(value)
+
+ intercept[NumberFormatException] {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s")
+ }
+ value = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
+ assertResult(null)(value)
+ }
+}
+
+class DummyRef extends QueryService {
+ override def query(request: QueryRequest): QueryResponse = ???
+
+ override def shutdown(request: ShutdownRequest): ShutdownResponse = ???
+
+ override def getProtocolVersion(protocol: String,
+ clientVersion: Long): Long = ???
+
+ override def getProtocolSignature(protocol: String,
+ clientVersion: Long,
+ clientMethodsHash: Int): ProtocolSignature = ???
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
deleted file mode 100644
index bafbb9f..0000000
--- a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
-
-/**
- * ReadSupport that convert row object to CarbonRow
- */
-@InterfaceAudience.Internal
-public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> {
- private CarbonReadSupport<Object[]> delegate;
-
- public CarbonRowReadSupport() {
- this.delegate = new DictionaryDecodeReadSupport<>();
- }
-
- @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable)
- throws IOException {
- delegate.initialize(carbonColumns, carbonTable);
- }
-
- @Override public CarbonRow readRow(Object[] data) {
- Object[] converted = delegate.readRow(data);
- return new CarbonRow(converted);
- }
-
- @Override public void close() {
- delegate.close();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java
deleted file mode 100644
index c6b2fb8..0000000
--- a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.scan.expression.Expression;
-
-/**
- * User can use {@link CarbonStore} to query data
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-public interface CarbonStore extends Closeable {
-
- /**
- * Scan query on the data in the table path
- * @param path table path
- * @param projectColumns column names to read
- * @return rows
- * @throws IOException if unable to read files in table path
- */
- Iterator<CarbonRow> scan(
- String path,
- String[] projectColumns) throws IOException;
-
- /**
- * Scan query with filter, on the data in the table path
- * @param path table path
- * @param projectColumns column names to read
- * @param filter filter condition, can be null
- * @return rows that satisfy filter condition
- * @throws IOException if unable to read files in table path
- */
- Iterator<CarbonRow> scan(
- String path,
- String[] projectColumns,
- Expression filter) throws IOException;
-
- /**
- * SQL query, table should be created before calling this function
- * @param sqlString SQL statement
- * @return rows
- * @throws IOException if unable to read files in table path
- */
- Iterator<CarbonRow> sql(String sqlString) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
deleted file mode 100644
index daa1447..0000000
--- a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.api.CarbonInputFormat;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * A CarbonStore implementation that works locally, without other compute framework dependency.
- * It can be used to read data in local disk.
- *
- * Note that this class is experimental, it is not intended to be used in production.
- */
-@InterfaceAudience.Internal
-class LocalCarbonStore extends MetaCachedCarbonStore {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(LocalCarbonStore.class.getName());
-
- @Override
- public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws IOException {
- return scan(path, projectColumns, null);
- }
-
- @Override public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter)
- throws IOException {
- Objects.requireNonNull(path);
- Objects.requireNonNull(projectColumns);
-
- CarbonTable table = getTable(path);
- if (table.isStreamingSink() || table.isHivePartitionTable()) {
- throw new UnsupportedOperationException("streaming and partition table is not supported");
- }
- // TODO: use InputFormat to prune data and read data
-
- final CarbonTableInputFormat format = new CarbonTableInputFormat();
- final Job job = new Job(new Configuration());
- CarbonInputFormat.setTableInfo(job.getConfiguration(), table.getTableInfo());
- CarbonInputFormat.setTablePath(job.getConfiguration(), table.getTablePath());
- CarbonInputFormat.setTableName(job.getConfiguration(), table.getTableName());
- CarbonInputFormat.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
- CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), CarbonRowReadSupport.class);
- CarbonInputFormat
- .setColumnProjection(job.getConfiguration(), new CarbonProjection(projectColumns));
- if (filter != null) {
- CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
- }
-
- final List<InputSplit> splits =
- format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
-
- List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size());
-
- List<CarbonRow> rows = new ArrayList<>();
-
- try {
- for (InputSplit split : splits) {
- TaskAttemptContextImpl attempt =
- new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
- RecordReader reader = format.createRecordReader(split, attempt);
- reader.initialize(split, attempt);
- readers.add(reader);
- }
-
- for (RecordReader<Void, Object> reader : readers) {
- while (reader.nextKeyValue()) {
- rows.add((CarbonRow) reader.getCurrentValue());
- }
- try {
- reader.close();
- } catch (IOException e) {
- LOGGER.error(e);
- }
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } finally {
- for (RecordReader<Void, Object> reader : readers) {
- try {
- reader.close();
- } catch (IOException e) {
- LOGGER.error(e);
- }
- }
- }
- return rows.iterator();
- }
-
- @Override
- public Iterator<CarbonRow> sql(String sqlString) throws IOException {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
deleted file mode 100644
index e43f750..0000000
--- a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-/**
- * A CarbonStore base class that caches CarbonTable object
- */
-@InterfaceAudience.Internal
-abstract class MetaCachedCarbonStore implements CarbonStore {
-
- // mapping of table path to CarbonTable object
- private Map<String, CarbonTable> cache = new HashMap<>();
-
- CarbonTable getTable(String path) throws IOException {
- if (cache.containsKey(path)) {
- return cache.get(path);
- }
- org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
- .readSchemaFile(CarbonTablePath.getSchemaFilePath(path));
- SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
- tableInfo1.setTablePath(path);
- CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1);
- cache.put(path, table);
- return table;
- }
-
- @Override
- public void close() throws IOException {
- cache.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
deleted file mode 100644
index c885a26..0000000
--- a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.sdk.file.Field;
-import org.apache.carbondata.sdk.file.Schema;
-import org.apache.carbondata.sdk.file.TestUtil;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class LocalCarbonStoreTest {
- @Before
- public void cleanFile() {
- assert (TestUtil.cleanMdtFile());
- }
-
- @After
- public void verifyDMFile() {
- assert (!TestUtil.verifyMdtFile());
- }
-
- // TODO: complete this testcase
- // Currently result rows are empty, because SDK is not writing table status file
- // so that reader does not find any segment.
- // Complete this testcase after flat folder reader is done.
- @Test
- public void testWriteAndReadFiles() throws IOException {
- String path = "./testWriteFiles";
- FileUtils.deleteDirectory(new File(path));
-
- Field[] fields = new Field[2];
- fields[0] = new Field("name", DataTypes.STRING);
- fields[1] = new Field("age", DataTypes.INT);
-
- TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
-
- CarbonStore store = new LocalCarbonStore();
- Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, null);
-
- while (rows.hasNext()) {
- CarbonRow row = rows.next();
- System.out.println(row.toString());
- }
-
- FileUtils.deleteDirectory(new File(path));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/pom.xml
----------------------------------------------------------------------
diff --git a/store/search/pom.xml b/store/search/pom.xml
deleted file mode 100644
index 6acbbfb..0000000
--- a/store/search/pom.xml
+++ /dev/null
@@ -1,112 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-parent</artifactId>
- <version>1.5.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>carbondata-search</artifactId>
- <name>Apache CarbonData :: Search </name>
-
- <properties>
- <dev.path>${basedir}/../../dev</dev.path>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-hadoop</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- <executions>
- <execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- <phase>compile</phase>
- </execution>
- <execution>
- <id>testCompile</id>
- <goals>
- <goal>testCompile</goal>
- </goals>
- <phase>test</phase>
- </execution>
- <execution>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0</version>
- <!-- Note config is repeated in surefire config -->
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>CarbonTestSuite.txt</filereports>
- <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
- </argLine>
- <stderr />
- <environmentVariables>
- </environmentVariables>
- <systemProperties>
- <java.awt.headless>true</java.awt.headless>
- <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
- </systemProperties>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
deleted file mode 100644
index 0a3110e..0000000
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.worker;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapDistributable;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.model.QueryModelBuilder;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonTaskInfo;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
-
-import org.apache.spark.search.SearchRequest;
-import org.apache.spark.search.SearchResult;
-import org.apache.spark.search.ShutdownRequest;
-import org.apache.spark.search.ShutdownResponse;
-
-/**
- * Thread runnable for handling SearchRequest from master.
- */
-@InterfaceAudience.Internal
-public class SearchRequestHandler {
-
- private static final LogService LOG =
- LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
-
- public SearchResult handleSearch(SearchRequest request) {
- try {
- LOG.info(String.format("[SearchId:%d] receive search request", request.searchId()));
- List<CarbonRow> rows = handleRequest(request);
- LOG.info(String.format("[SearchId:%d] sending success response", request.searchId()));
- return createSuccessResponse(request, rows);
- } catch (IOException | InterruptedException e) {
- LOG.error(e);
- LOG.info(String.format("[SearchId:%d] sending failure response", request.searchId()));
- return createFailureResponse(request, e);
- }
- }
-
- public ShutdownResponse handleShutdown(ShutdownRequest request) {
- LOG.info("Shutting down worker...");
- SearchModeDetailQueryExecutor.shutdownThreadPool();
- SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
- LOG.info("Worker shutted down");
- return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
- }
-
- private DataMapExprWrapper chooseFGDataMap(
- CarbonTable table,
- FilterResolverIntf filterInterface) {
- DataMapChooser chooser = null;
- try {
- chooser = new DataMapChooser(table);
- return chooser.chooseFGDataMap(filterInterface);
- } catch (IOException e) {
- LOG.audit(e.getMessage());
- return null;
- }
- }
-
- /**
- * Builds {@link QueryModel} and read data from files
- */
- private List<CarbonRow> handleRequest(SearchRequest request)
- throws IOException, InterruptedException {
- CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
- carbonTaskInfo.setTaskId(System.nanoTime());
- ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
- TableInfo tableInfo = request.tableInfo();
- CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
- QueryModel queryModel = createQueryModel(table, request);
-
- // in search mode, plain reader is better since it requires less memory
- queryModel.setVectorReader(false);
-
- CarbonMultiBlockSplit mbSplit = request.split().value();
- List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
- queryModel.setTableBlockInfos(list);
- long limit = request.limit();
- long rowCount = 0;
-
- LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
- request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size()));
- DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
- queryModel.getFilterExpressionResolverTree());
-
- // If there is DataMap selected in Master, prune the split by it
- if (fgDataMap != null) {
- queryModel = prune(request.searchId(), table, queryModel, mbSplit, fgDataMap);
- }
-
- // In search mode, reader will read multiple blocks by using a thread pool
- CarbonRecordReader<CarbonRow> reader =
- new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
-
- // read all rows by the reader
- List<CarbonRow> rows = new LinkedList<>();
- try {
- reader.initialize(mbSplit, null);
-
- // loop to read required number of rows.
- // By default, if user does not specify the limit value, limit is Long.MaxValue
- while (reader.nextKeyValue() && rowCount < limit) {
- rows.add(reader.getCurrentValue());
- rowCount++;
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } finally {
- reader.close();
- }
- LOG.info(String.format("[SearchId:%d] scan completed, return %d rows",
- request.searchId(), rows.size()));
- return rows;
- }
-
- /**
- * If there is FGDataMap defined for this table and filter condition in the query,
- * prune the splits by the DataMap and set the pruned split into the QueryModel and return
- */
- private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel,
- CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException {
- Objects.requireNonNull(datamap);
- List<Segment> segments = new LinkedList<>();
- HashMap<String, Integer> uniqueSegments = new HashMap<>();
- LoadMetadataDetails[] loadMetadataDetails =
- SegmentStatusManager.readLoadMetadata(
- CarbonTablePath.getMetadataPath(table.getTablePath()));
- for (CarbonInputSplit split : mbSplit.getAllSplits()) {
- String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString();
- if (uniqueSegments.get(segmentId) == null) {
- segments.add(Segment.toSegment(segmentId,
- new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
- loadMetadataDetails)));
- uniqueSegments.put(segmentId, 1);
- } else {
- uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
- }
- }
-
- List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments);
- List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>();
- for (int i = 0; i < distributables.size(); i++) {
- DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable();
- prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
- }
-
- HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
- for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
- pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet);
- }
-
- List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
- List<TableBlockInfo> blockToRead = new LinkedList<>();
- for (TableBlockInfo block : blocks) {
- if (pathToRead.keySet().contains(block.getFilePath())) {
- // If not set this, it will can't create FineGrainBlocklet object in
- // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
- block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
- blockToRead.add(block);
- }
- }
- LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", queryId,
- blockToRead.size()));
- queryModel.setTableBlockInfos(blockToRead);
- queryModel.setFG(true);
- return queryModel;
- }
-
- private QueryModel createQueryModel(CarbonTable table, SearchRequest request) {
- String[] projectColumns = request.projectColumns();
- Expression filter = null;
- if (request.filterExpression() != null) {
- filter = request.filterExpression();
- }
- return new QueryModelBuilder(table)
- .projectColumns(projectColumns)
- .filterExpression(filter)
- .build();
- }
-
- /**
- * create a failure response
- */
- private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) {
- return new SearchResult(request.searchId(), Status.FAILURE.ordinal(), throwable.getMessage(),
- new Object[0][]);
- }
-
- /**
- * create a success response with result rows
- */
- private SearchResult createSuccessResponse(SearchRequest request, List<CarbonRow> rows) {
- Iterator<CarbonRow> itor = rows.iterator();
- Object[][] output = new Object[rows.size()][];
- int i = 0;
- while (itor.hasNext()) {
- output[i++] = itor.next().getData();
- }
- return new SearchResult(request.searchId(), Status.SUCCESS.ordinal(), "", output);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java b/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
deleted file mode 100644
index 71df3e0..0000000
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.worker;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-/**
- * Status of RPC response
- */
-@InterfaceAudience.Internal
-public enum Status {
- SUCCESS, FAILURE
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa111380/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
deleted file mode 100644
index b7630fb..0000000
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success, Try}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{SecurityManager, SerializableWritable, SparkConf}
-import org.apache.spark.rpc.netty.NettyRpcEnvFactory
-import org.apache.spark.search._
-import org.apache.spark.util.ThreadUtils
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.block.Distributable
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
-import org.apache.carbondata.hadoop.api.CarbonInputFormat
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.store.worker.Status
-
-/**
- * Master of CarbonSearch.
- * It provides a Registry service for worker to register.
- * And it provides search API to fire RPC call to workers.
- */
-@InterfaceAudience.Internal
-class Master(sparkConf: SparkConf) {
- private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- // worker host address map to EndpointRef
-
- private val random = new Random
-
- private var rpcEnv: RpcEnv = _
-
- private val scheduler: Scheduler = new Scheduler
-
- /** start service and listen on port passed in constructor */
- def startService(): Unit = {
- if (rpcEnv == null) {
- LOG.info("Start search mode master thread")
- val isStarted: AtomicBoolean = new AtomicBoolean(false)
- new Thread(new Runnable {
- override def run(): Unit = {
- val hostAddress = InetAddress.getLocalHost.getHostAddress
- var port = CarbonProperties.getSearchMasterPort
- var exception: BindException = null
- var numTry = 100 // we will try to create service at worse case 100 times
- do {
- try {
- LOG.info(s"starting registry-service on $hostAddress:$port")
- val config = RpcEnvConfig(
- sparkConf, "registry-service", hostAddress, "", port,
- new SecurityManager(sparkConf), clientMode = false)
- rpcEnv = new NettyRpcEnvFactory().create(config)
- numTry = 0
- } catch {
- case e: BindException =>
- // port is occupied, increase the port number and try again
- exception = e
- LOG.error(s"start registry-service failed: ${e.getMessage}")
- port = port + 1
- numTry = numTry - 1
- }
- } while (numTry > 0)
- if (rpcEnv == null) {
- // we have tried many times, but still failed to find an available port
- throw exception
- }
- val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this)
- rpcEnv.setupEndpoint("registry-service", registryEndpoint)
- if (isStarted.compareAndSet(false, false)) {
- synchronized {
- isStarted.compareAndSet(false, true)
- }
- }
- LOG.info("registry-service started")
- rpcEnv.awaitTermination()
- }
- }).start()
- var count = 0
- val countThreshold = 5000
- while (isStarted.compareAndSet(false, false) && count < countThreshold) {
- LOG.info(s"Waiting search mode master to start, retrying $count times")
- Thread.sleep(10)
- count = count + 1;
- }
- if (count >= countThreshold) {
- LOG.error(s"Search mode try $countThreshold times to start master but failed")
- throw new RuntimeException(
- s"Search mode try $countThreshold times to start master but failed")
- } else {
- LOG.info("Search mode master started")
- }
- } else {
- LOG.info("Search mode master has already started")
- }
- }
-
- def stopService(): Unit = {
- if (rpcEnv != null) {
- rpcEnv.shutdown()
- rpcEnv = null
- }
- }
-
- def stopAllWorkers(): Unit = {
- val futures = scheduler.getAllWorkers.toSeq.map { case (address, schedulable) =>
- (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user")))
- }
- futures.foreach { case (address, future) =>
- ThreadUtils.awaitResult(future, Duration.apply("10s"))
- future.value match {
- case Some(result) =>
- result match {
- case Success(response) => scheduler.removeWorker(address)
- case Failure(throwable) => throw new IOException(throwable.getMessage)
- }
- case None => throw new ExecutionTimeoutException
- }
- }
- }
-
- /** A new searcher is trying to register, add it to the map and connect to this searcher */
- def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = {
- LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " +
- s"with ${request.cores} cores")
- val workerId = UUID.randomUUID().toString
- val workerAddress = request.hostAddress
- val workerPort = request.port
- LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId")
-
- val endPointRef =
- rpcEnv.setupEndpointRef(RpcAddress(workerAddress, workerPort), "search-service")
- scheduler.addWorker(workerAddress,
- new Schedulable(workerId, workerAddress, workerPort, request.cores, endPointRef))
- LOG.info(s"worker ${request.hostAddress}:${request.port} registered")
- RegisterWorkerResponse(workerId)
- }
-
- /**
- * Execute search by firing RPC call to worker, return the result rows
- * @param table table to search
- * @param columns projection column names
- * @param filter filter expression
- * @param globalLimit max number of rows required in Master
- * @param localLimit max number of rows required in Worker
- * @return
- */
- def search(table: CarbonTable, columns: Array[String], filter: Expression,
- globalLimit: Long, localLimit: Long): Array[CarbonRow] = {
- Objects.requireNonNull(table)
- Objects.requireNonNull(columns)
- if (globalLimit < 0 || localLimit < 0) {
- throw new IllegalArgumentException("limit should be positive")
- }
-
- val queryId = random.nextInt
- var rowCount = 0
- val output = new ArrayBuffer[CarbonRow]
-
- def onSuccess(result: SearchResult): Unit = {
- // in case of RPC success, collect all rows in response message
- if (result.queryId != queryId) {
- throw new IOException(
- s"queryId in response does not match request: ${result.queryId} != $queryId")
- }
- if (result.status != Status.SUCCESS.ordinal()) {
- throw new IOException(s"failure in worker: ${ result.message }")
- }
-
- val itor = result.rows.iterator
- while (itor.hasNext && rowCount < globalLimit) {
- output += new CarbonRow(itor.next())
- rowCount = rowCount + 1
- }
- LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount")
- }
- def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }")
- def onTimedout() = throw new ExecutionTimeoutException()
-
- // prune data and get a mapping of worker hostname to list of blocks,
- // then add these blocks to the SearchRequest and fire the RPC call
- val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
- val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
- // Build a SearchRequest
- val split = new SerializableWritable[CarbonMultiBlockSplit](
- new CarbonMultiBlockSplit(blocks, splitAddress))
- val request =
- SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
-
- // Find an Endpoind and send the request to it
- // This RPC is non-blocking so that we do not need to wait before send to next worker
- scheduler.sendRequestAsync[SearchResult](splitAddress, request)
- }
-
- // loop to get the result of each Worker
- tuple.foreach { case (worker: Schedulable, future: Future[SearchResult]) =>
-
- // if we have enough data already, we do not need to collect more result
- if (rowCount < globalLimit) {
- // wait for worker
- val timeout = CarbonProperties
- .getInstance()
- .getProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT,
- CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT)
- ThreadUtils.awaitResult(future, Duration.apply(timeout))
- LOG.info(s"[SearchId:$queryId] receive search response from worker " +
- s"${worker.address}:${worker.port}")
- try {
- future.value match {
- case Some(response: Try[SearchResult]) =>
- response match {
- case Success(result) => onSuccess(result)
- case Failure(e) => onFaiure(e)
- }
- case None => onTimedout()
- }
- } finally {
- worker.workload.decrementAndGet()
- }
- }
- }
- output.toArray
- }
-
- /**
- * Prune data by using CarbonInputFormat.getSplit
- * Return a mapping of host address to list of block
- */
- private def pruneBlock(
- table: CarbonTable,
- columns: Array[String],
- filter: Expression): JMap[String, JList[Distributable]] = {
- val jobConf = new JobConf(new Configuration)
- val job = new Job(jobConf)
- val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
- job, table, columns, filter, null, null)
-
- // We will do FG pruning in reader side, so don't do it here
- CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false)
- val splits = format.getSplits(job)
- val distributables = splits.asScala.map { split =>
- split.asInstanceOf[Distributable]
- }
- CarbonLoaderUtil.nodeBlockMapping(
- distributables.asJava,
- -1,
- getWorkers.asJava,
- CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST,
- null)
- }
-
- /** return hostname of all workers */
- def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq
-}
-
-// Exception if execution timed out in search mode
-class ExecutionTimeoutException extends RuntimeException