You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/14 22:40:19 UTC

[30/50] [abbrv] incubator-geode git commit: GEODE-137: use local GemFire server to initialize LocalCache whenever possible.

GEODE-137: use local GemFire server to initialize LocalCache whenever possible.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2e2a795d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2e2a795d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2e2a795d

Branch: refs/heads/feature/GEODE-77
Commit: 2e2a795db57e21cc83784f0b3111b0b5e355ad87
Parents: 7d4ae09
Author: Qihong Chen <qc...@pivotal.io>
Authored: Wed Jul 29 10:02:26 2015 -0700
Committer: Qihong Chen <qc...@pivotal.io>
Committed: Thu Aug 6 10:44:36 2015 -0700

----------------------------------------------------------------------
 .../connector/GemFirePairRDDFunctions.scala     |  5 +-
 .../spark/connector/GemFireRDDFunctions.scala   |  5 +-
 .../internal/DefaultGemFireConnection.scala     | 30 +++++--
 .../connector/internal/LocatorHelper.scala      | 91 +++++++++++++++++++-
 .../internal/rdd/GemFireRegionRDD.scala         |  2 +-
 .../gemfire/spark/connector/package.scala       |  7 ++
 .../spark/connector/LocatorHelperTest.scala     | 77 +++++++++++++++++
 7 files changed, 208 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
index 86ec596..8050a5e 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
@@ -23,7 +23,10 @@ class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable w
       connConf: GemFireConnectionConf = defaultConnectionConf, 
       opConf: Map[String, String] = Map.empty): Unit = {    
     connConf.getConnection.validateRegion[K, V](regionPath)
-    logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
+    if (log.isDebugEnabled)
+      logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n  ${getRddPartitionsInfo(rdd)}""")
+    else
+      logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
     val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf)
     rdd.sparkContext.runJob(rdd, writer.write _)
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
index 3aa1ebd..5415727 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
@@ -25,7 +25,10 @@ class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging
       connConf: GemFireConnectionConf = defaultConnectionConf,
       opConf: Map[String, String] = Map.empty): Unit = {
     connConf.getConnection.validateRegion[K, V](regionPath)
-    logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
+    if (log.isDebugEnabled)
+      logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n  ${getRddPartitionsInfo(rdd)}""")
+    else
+      logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
     val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf)
     rdd.sparkContext.runJob(rdd, writer.write(func) _)
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
index bba6c69..3fcb496 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
@@ -1,5 +1,7 @@
 package io.pivotal.gemfire.spark.connector.internal
 
+import java.net.InetAddress
+
 import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut}
 import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService}
 import com.gemstone.gemfire.cache.query.Query
@@ -7,7 +9,7 @@ import com.gemstone.gemfire.cache.{Region, RegionService}
 import com.gemstone.gemfire.internal.cache.execute.InternalExecution
 import io.pivotal.gemfire.spark.connector.internal.oql.QueryResultCollector
 import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition
-import org.apache.spark.Logging
+import org.apache.spark.{SparkEnv, Logging}
 import io.pivotal.gemfire.spark.connector.GemFireConnection
 import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions._
 import java.util.{Set => JSet, List => JList }
@@ -30,10 +32,7 @@ private[connector] class DefaultGemFireConnection (
 
   private def initClientCache() : ClientCache = {
     try {
-      import io.pivotal.gemfire.spark.connector.map2Properties
-      logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""")
-      val ccf = new ClientCacheFactory(gemFireProps)
-      locators.foreach { case (host, port)  => ccf.addPoolLocator(host, port) }
+      val ccf = getClientCacheFactory
       ccf.create()
     } catch {
       case e: Exception =>
@@ -41,6 +40,27 @@ private[connector] class DefaultGemFireConnection (
         throw new RuntimeException(e)
     }
   }
+  
+  private def getClientCacheFactory: ClientCacheFactory = {
+    import io.pivotal.gemfire.spark.connector.map2Properties
+    val ccf = new ClientCacheFactory(gemFireProps)
+    ccf.setPoolReadTimeout(30000)
+    val servers = LocatorHelper.getAllGemFireServers(locators)
+    if (servers.isDefined && servers.get.size > 0) {
+      val sparkIp = System.getenv("SPARK_LOCAL_IP")
+      val hostName = if (sparkIp != null) InetAddress.getByName(sparkIp).getCanonicalHostName
+                     else InetAddress.getLocalHost.getCanonicalHostName
+      val executorId = SparkEnv.get.executorId      
+      val pickedServers = LocatorHelper.pickPreferredGemFireServers(servers.get, hostName, executorId)
+      logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, host=$hostName executor=$executorId props=$gemFireProps""")
+      logDebug(s"""Init ClientCache: all-severs=${pickedServers.mkString(",")}""")
+      pickedServers.foreach{ case (host, port)  => ccf.addPoolServer(host, port) }
+    } else {
+      logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""")
+      locators.foreach { case (host, port)  => ccf.addPoolLocator(host, port) }
+    }
+    ccf
+  }
 
   /** close the clientCache */
   override def close(): Unit =

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
index 550e0bc..a010c62 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
@@ -1,9 +1,17 @@
 package io.pivotal.gemfire.spark.connector.internal
 
+import java.net.InetSocketAddress
+import java.util.{ArrayList => JArrayList}
+
+import com.gemstone.gemfire.cache.client.internal.locator.{GetAllServersResponse, GetAllServersRequest}
+import com.gemstone.gemfire.distributed.internal.ServerLocation
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient
+import org.apache.spark.Logging
+
 import scala.util.{Failure, Success, Try}
 
 
-object LocatorHelper {
+object LocatorHelper extends Logging {
 
   /** valid locator strings are: host[port] and host:port */
   final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r
@@ -27,4 +35,85 @@ object LocatorHelper {
   def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] =
     locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get)
 
+
+  /**
+   * Return the list of live GemFire servers for the given locators. 
+   * @param locators locators for the given GemFire cluster
+   * @param serverGroup optional server group name, default is "" (empty string)
+   */
+  def getAllGemFireServers(locators: Seq[(String, Int)], serverGroup: String = ""): Option[Seq[(String, Int)]] = {
+    var result: Option[Seq[(String, Int)]] = None
+    locators.find { case (host, port) =>
+      try {
+        val addr = new InetSocketAddress(host, port)
+        val req = new GetAllServersRequest(serverGroup)
+        val res = TcpClient.requestToServer(addr.getAddress, addr.getPort, req, 2000)
+        if (res != null) {
+          import scala.collection.JavaConverters._
+          val servers = res.asInstanceOf[GetAllServersResponse].getServers.asInstanceOf[JArrayList[ServerLocation]]
+          if (servers.size > 0)
+            result = Some(servers.asScala.map(e => (e.getHostName, e.getPort)))
+        }
+      } catch { case e: Exception => logWarning("getAllGemFireServers error", e)
+      }
+      result.isDefined
+    }
+    result
+  }
+
+  /**
+   * Pick up at most 3 preferred servers from all available servers based on
+   * host name and Spark executor id.
+   *
+   * This method is used by DefaultGemFireConnection to create LocalCache. Usually 
+   * one server is enough to initialize LocalCacheFactory, but this provides two 
+   * backup servers in case of the 1st server can't be connected.
+   *   
+   * @param servers all available servers in the form of (hostname, port) pairs
+   * @param hostName the host name of the Spark executor
+   * @param executorId the Spark executor Id, such as "<driver>", "0", "1", ...
+   * @return Seq[(hostname, port)] of preferred servers
+   */
+  def pickPreferredGemFireServers(
+    servers: Seq[(String, Int)], hostName: String, executorId: String): Seq[(String, Int)] = {
+
+    // pick up `length` items form the Seq starts at the `start` position.
+    //  The Seq is treated as a ring, so at most `Seq.size` items can be picked
+    def circularTake[T](seq: Seq[T], start: Int, length: Int): Seq[T] = {
+      val size = math.min(seq.size, length)
+      (start until start + size).map(x => seq(x % seq.size))
+    }
+
+    // map executor id to int: "<driver>" (or non-number string) to 0, and "n" to n + 1
+    val id = try { executorId.toInt + 1 } catch { case e: NumberFormatException => 0 }
+    
+    // algorithm: 
+    // 1. sort server list
+    // 2. split sorted server list into 3 sub-lists a, b, and c:
+    //      list-a: servers on the given host
+    //      list-b: servers that are in front of list-a on the sorted server list
+    //      list-c: servers that are behind list-a on the sorted server list
+    //    then rotate list-a based on executor id, then create new server list:
+    //      modified list-a ++ list-c ++ list-b
+    // 3. if there's no server on the given host, then create new server list
+    //    by rotating sorted server list based on executor id.
+    // 4. take up to 3 servers from the new server list
+    val sortedServers = servers.sorted
+    val firstIdx = sortedServers.indexWhere(p => p._1 == hostName)
+    val lastIdx = if (firstIdx < 0) -1 else sortedServers.lastIndexWhere(p => p._1 == hostName)
+
+    if (firstIdx < 0) { // no local server
+      circularTake(sortedServers, id, 3)
+    } else {
+      val (seq1, seq2) = sortedServers.splitAt(firstIdx)
+      val seq = if (firstIdx == lastIdx) {  // one local server
+        seq2 ++ seq1
+      } else { // multiple local server
+        val (seq3, seq4) = seq2.splitAt(lastIdx - firstIdx + 1)
+        val seq3b = if (id % seq3.size == 0) seq3 else circularTake(seq3, id, seq3.size)
+        seq3b ++ seq4 ++ seq1
+      }
+      circularTake(seq, 0, 3)
+    }
+  }  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
index cff61d6..3a987b2 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
@@ -82,7 +82,7 @@ class GemFireRegionRDD[K, V] private[connector]
         logInfo(s"""RDD id=${this.id} region=$regionPath conn=${connConf.locators.mkString(",")}, env=$opConf""")
         val p = if (data.isPartitioned) preferredPartitioner else defaultReplicatedRegionPartitioner
         val splits = p.partitions[K, V](conn, data, opConf)
-        logDebug(s"""RDD id=${this.id} region=$regionPath partitions=${splits.mkString(",")}""")
+        logDebug(s"""RDD id=${this.id} region=$regionPath partitions=\n  ${splits.mkString("\n  ")}""")
         splits
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
index 72a5bb1..d08e96c 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
@@ -25,6 +25,8 @@ package object connector {
   final val RDDSaveBatchSizePropKey = "rdd.save.batch.size"
   final val RDDSaveBatchSizeDefault = 10000
   
+  /** implicits */
+  
   implicit def toSparkContextFunctions(sc: SparkContext): GemFireSparkContextFunctions =
     new GemFireSparkContextFunctions(sc)
 
@@ -43,4 +45,9 @@ package object connector {
   implicit def map2Properties(map: Map[String,String]): java.util.Properties =
     (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
 
+  /** internal util methods */
+  
+  private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n  "): String =
+    rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p loc=${rdd.preferredLocations(p)}"}.mkString(sep)
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
index 508666a..de4b7a7 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
@@ -1,5 +1,7 @@
 package unittest.io.pivotal.gemfire.spark.connector
 
+import java.net.InetAddress
+
 import io.pivotal.gemfire.spark.connector.internal.LocatorHelper
 import org.scalatest.FunSuite
 
@@ -72,4 +74,79 @@ class LocatorHelperTest extends FunSuite {
     intercept[Exception] { LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") }
   }
 
+  test("pickPreferredGemFireServers: shared servers and one gf-server per host") {
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
+    val servers = Seq(srv1, srv2, srv3, srv4)
+    verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+    verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4))
+    verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1))
+    verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2))
+  }
+
+  test("pickPreferredGemFireServers: shared servers, one gf-server per host, un-sorted list") {
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
+    val servers = Seq(srv4, srv2, srv3, srv1)
+    verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+    verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4))
+    verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1))
+    verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2))
+  }
+
+  test("pickPreferredGemFireServers: shared servers and two gf-server per host") {
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004))
+    val servers = Seq(srv1, srv2, srv3, srv4)
+    verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+    verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3))
+    verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1))
+    verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1))
+  }
+
+  test("pickPreferredGemFireServers: shared servers, two gf-server per host, un-sorted server list") {
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004))
+    val servers = Seq(srv1, srv4, srv3, srv2)
+    verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+    verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3))
+    verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1))
+    verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1))
+  }
+
+  test("pickPreferredGemFireServers: no shared servers and one gf-server per host") {
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
+    val servers = Seq(srv1, srv2, srv3, srv4)
+    verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2, srv3))
+    verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv3, srv4))
+    verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv3, srv4, srv1))
+    verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv4, srv1, srv2))
+  }
+
+  test("pickPreferredGemFireServers: no shared servers, one gf-server per host, and less gf-server") {
+    val (srv1, srv2) = (("host1", 4001), ("host2", 4002))
+    val servers = Seq(srv1, srv2)
+    verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2))
+    verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv1))
+    verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv1, srv2))
+    verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv2, srv1))
+
+
+    println("host name: " + InetAddress.getLocalHost.getHostName)
+    println("canonical host name: " + InetAddress.getLocalHost.getCanonicalHostName)
+    println("canonical host name 2: " + InetAddress.getByName(InetAddress.getLocalHost.getHostName).getCanonicalHostName)
+  }
+
+  test("pickPreferredGemFireServers: ad-hoc") {
+    val (srv4, srv5, srv6) = (
+      ("w2-gst-pnq-04.gemstone.com", 40411), ("w2-gst-pnq-05.gemstone.com", 40411), ("w2-gst-pnq-06.gemstone.com", 40411))
+    val servers = Seq(srv6, srv5, srv4)
+    verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-03.gemstone.com", "<driver>", Seq(srv4, srv5, srv6))
+    verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-04.gemstone.com", "1", Seq(srv4, srv5, srv6))
+    verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-05.gemstone.com", "0", Seq(srv5, srv6, srv4))
+    verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-06.gemstone.com", "2", Seq(srv6, srv4, srv5))
+  }
+  
+  def verifyPickPreferredGemFireServers(
+    servers: Seq[(String, Int)], hostName: String, executorId: String, expectation: Seq[(String, Int)]): Unit = {
+    val result = LocatorHelper.pickPreferredGemFireServers(servers, hostName, executorId)
+    assert(result == expectation, s"pick servers for $hostName:$executorId")
+  }
+
 }