You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/03 11:16:03 UTC

carbondata git commit: [CARBONDATA-2422] Search mode Master port should be dynamic

Repository: carbondata
Updated Branches:
  refs/heads/master 2c095542b -> d1139330f


[CARBONDATA-2422] Search mode Master port should be dynamic

In SDV test, sometimes search mode testcase failed because Master port is occupied. This PR adds support for dynamic master port to avoid port binding failure

This closes #2256


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d1139330
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d1139330
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d1139330

Branch: refs/heads/master
Commit: d1139330f0225950b664b37447eb48ae5662ebb4
Parents: 2c09554
Author: Jacky Li <ja...@qq.com>
Authored: Tue May 1 20:25:16 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu May 3 16:45:47 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  4 +-
 .../scala/org/apache/spark/rpc/Master.scala     | 39 ++++++++++++++------
 2 files changed, 31 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1139330/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 553698a..648f08e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1666,7 +1666,9 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.scan.thread";
 
   /**
-   * In search mode, Master will listen on this port for worker registration
+   * In search mode, Master will listen on this port for worker registration.
+   * If Master failed to start service with this port, it will try to increment the port number
+   * and try to bind again, until it is success
    */
   @CarbonProperty
   @InterfaceStability.Unstable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1139330/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index e98a780..df793b4 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.rpc
 
 import java.io.IOException
-import java.net.InetAddress
+import java.net.{BindException, InetAddress}
 import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
 
 import scala.collection.JavaConverters._
@@ -49,11 +49,11 @@ import org.apache.carbondata.store.worker.Status
 
 /**
  * Master of CarbonSearch.
- * It listens to [[Master.port]] to wait for worker to register.
+ * It provides a Registry service for worker to register.
  * And it provides search API to fire RPC call to workers.
  */
 @InterfaceAudience.Internal
-class Master(sparkConf: SparkConf, port: Int) {
+class Master(sparkConf: SparkConf) {
   private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // worker host address map to EndpointRef
@@ -64,22 +64,39 @@ class Master(sparkConf: SparkConf, port: Int) {
 
   private val scheduler: Scheduler = new Scheduler
 
-  def this(sparkConf: SparkConf) = {
-    this(sparkConf, CarbonProperties.getSearchMasterPort)
-  }
-
   /** start service and listen on port passed in constructor */
   def startService(): Unit = {
     if (rpcEnv == null) {
       new Thread(new Runnable {
         override def run(): Unit = {
           val hostAddress = InetAddress.getLocalHost.getHostAddress
-          val config = RpcEnvConfig(
-            sparkConf, "registry-service", hostAddress, "", CarbonProperties.getSearchMasterPort,
-            new SecurityManager(sparkConf), clientMode = false)
-          rpcEnv = new NettyRpcEnvFactory().create(config)
+          var port = CarbonProperties.getSearchMasterPort
+          var exception: BindException = null
+          var numTry = 100  // we will try to create service at worse case 100 times
+          do {
+            try {
+              LOG.info(s"starting registry-service on $hostAddress:$port")
+              val config = RpcEnvConfig(
+                sparkConf, "registry-service", hostAddress, "", port,
+                new SecurityManager(sparkConf), clientMode = false)
+              rpcEnv = new NettyRpcEnvFactory().create(config)
+              numTry = 0
+            } catch {
+              case e: BindException =>
+                // port is occupied, increase the port number and try again
+                exception = e
+                LOG.error(s"start registry-service failed: ${e.getMessage}")
+                port = port + 1
+                numTry = numTry - 1
+            }
+          } while (numTry > 0)
+          if (rpcEnv == null) {
+            // we have tried many times, but still failed to find an available port
+            throw exception
+          }
           val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this)
           rpcEnv.setupEndpoint("registry-service", registryEndpoint)
+          LOG.info("registry-service started")
           rpcEnv.awaitTermination()
         }
       }).start()