You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/17 14:30:21 UTC
[08/50] [abbrv] carbondata git commit: [CARBONDATA-2408] Fix search
mode master SaslException issue in the first time
[CARBONDATA-2408] Fix search mode master SaslException issue in the first time
This closes #2239
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fb128974
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fb128974
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fb128974
Branch: refs/heads/spark-2.3
Commit: fb12897474f8eb9b4ca04764f9ee15890573b057
Parents: 452c42b
Author: xubo245 <60...@qq.com>
Authored: Fri Apr 27 18:57:37 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed May 9 11:02:48 2018 +0800
----------------------------------------------------------------------
.../carbondata/store/SparkCarbonStore.scala | 1 +
.../scala/org/apache/spark/rpc/Master.scala | 24 ++++++++++++++++++++
2 files changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb128974/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index c0d0d09..3a6adea 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -110,6 +110,7 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
}
def startSearchMode(): Unit = {
+ LOG.info("Starting search mode master")
master = new Master(session.sparkContext.getConf)
master.startService()
startAllWorkers()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb128974/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index df793b4..5b31a49 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rpc
import java.io.IOException
import java.net.{BindException, InetAddress}
import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -67,6 +68,8 @@ class Master(sparkConf: SparkConf) {
/** start service and listen on port passed in constructor */
def startService(): Unit = {
if (rpcEnv == null) {
+ LOG.info("Start search mode master thread")
+ val isStarted: AtomicBoolean = new AtomicBoolean(false)
new Thread(new Runnable {
override def run(): Unit = {
val hostAddress = InetAddress.getLocalHost.getHostAddress
@@ -96,10 +99,31 @@ class Master(sparkConf: SparkConf) {
}
val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this)
rpcEnv.setupEndpoint("registry-service", registryEndpoint)
+ if (isStarted.compareAndSet(false, false)) {
+ synchronized {
+ isStarted.compareAndSet(false, true)
+ }
+ }
LOG.info("registry-service started")
rpcEnv.awaitTermination()
}
}).start()
+ var count = 0
+ val countThreshold = 5000
+ while (isStarted.compareAndSet(false, false) && count < countThreshold) {
+ LOG.info(s"Waiting search mode master to start, retrying $count times")
+ Thread.sleep(10)
+ count = count + 1;
+ }
+ if (count >= countThreshold) {
+ LOG.error(s"Search mode try $countThreshold times to start master but failed")
+ throw new RuntimeException(
+ s"Search mode try $countThreshold times to start master but failed")
+ } else {
+ LOG.info("Search mode master started")
+ }
+ } else {
+ LOG.info("Search mode master has already started")
}
}