You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/17 14:30:21 UTC

[08/50] [abbrv] carbondata git commit: [CARBONDATA-2408] Fix search mode master SaslException issue in the first time

[CARBONDATA-2408] Fix search mode master SaslException issue in the first time

This closes #2239


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fb128974
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fb128974
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fb128974

Branch: refs/heads/spark-2.3
Commit: fb12897474f8eb9b4ca04764f9ee15890573b057
Parents: 452c42b
Author: xubo245 <60...@qq.com>
Authored: Fri Apr 27 18:57:37 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed May 9 11:02:48 2018 +0800

----------------------------------------------------------------------
 .../carbondata/store/SparkCarbonStore.scala     |  1 +
 .../scala/org/apache/spark/rpc/Master.scala     | 24 ++++++++++++++++++++
 2 files changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb128974/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index c0d0d09..3a6adea 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -110,6 +110,7 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
   }
 
   def startSearchMode(): Unit = {
+    LOG.info("Starting search mode master")
     master = new Master(session.sparkContext.getConf)
     master.startService()
     startAllWorkers()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb128974/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index df793b4..5b31a49 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rpc
 import java.io.IOException
 import java.net.{BindException, InetAddress}
 import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -67,6 +68,8 @@ class Master(sparkConf: SparkConf) {
   /** start service and listen on port passed in constructor */
   def startService(): Unit = {
     if (rpcEnv == null) {
+      LOG.info("Start search mode master thread")
+      val isStarted: AtomicBoolean = new AtomicBoolean(false)
       new Thread(new Runnable {
         override def run(): Unit = {
           val hostAddress = InetAddress.getLocalHost.getHostAddress
@@ -96,10 +99,31 @@ class Master(sparkConf: SparkConf) {
           }
           val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this)
           rpcEnv.setupEndpoint("registry-service", registryEndpoint)
+          if (isStarted.compareAndSet(false, false)) {
+            synchronized {
+              isStarted.compareAndSet(false, true)
+            }
+          }
           LOG.info("registry-service started")
           rpcEnv.awaitTermination()
         }
       }).start()
+      var count = 0
+      val countThreshold = 5000
+      while (isStarted.compareAndSet(false, false) && count < countThreshold) {
+        LOG.info(s"Waiting search mode master to start, retrying $count times")
+        Thread.sleep(10)
+        count = count + 1;
+      }
+      if (count >= countThreshold) {
+        LOG.error(s"Search mode try $countThreshold times to start master but failed")
+        throw new RuntimeException(
+          s"Search mode try $countThreshold times to start master but failed")
+      } else {
+        LOG.info("Search mode master started")
+      }
+    } else {
+      LOG.info("Search mode master has already started")
     }
   }