You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/14 22:40:19 UTC
[30/50] [abbrv] incubator-geode git commit: GEODE-137: use local
GemFire server to initialize LocalCache whenever possible.
GEODE-137: use local GemFire server to initialize LocalCache whenever possible.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2e2a795d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2e2a795d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2e2a795d
Branch: refs/heads/feature/GEODE-77
Commit: 2e2a795db57e21cc83784f0b3111b0b5e355ad87
Parents: 7d4ae09
Author: Qihong Chen <qc...@pivotal.io>
Authored: Wed Jul 29 10:02:26 2015 -0700
Committer: Qihong Chen <qc...@pivotal.io>
Committed: Thu Aug 6 10:44:36 2015 -0700
----------------------------------------------------------------------
.../connector/GemFirePairRDDFunctions.scala | 5 +-
.../spark/connector/GemFireRDDFunctions.scala | 5 +-
.../internal/DefaultGemFireConnection.scala | 30 +++++--
.../connector/internal/LocatorHelper.scala | 91 +++++++++++++++++++-
.../internal/rdd/GemFireRegionRDD.scala | 2 +-
.../gemfire/spark/connector/package.scala | 7 ++
.../spark/connector/LocatorHelperTest.scala | 77 +++++++++++++++++
7 files changed, 208 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
index 86ec596..8050a5e 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
@@ -23,7 +23,10 @@ class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable w
connConf: GemFireConnectionConf = defaultConnectionConf,
opConf: Map[String, String] = Map.empty): Unit = {
connConf.getConnection.validateRegion[K, V](regionPath)
- logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
+ if (log.isDebugEnabled)
+ logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""")
+ else
+ logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf)
rdd.sparkContext.runJob(rdd, writer.write _)
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
index 3aa1ebd..5415727 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
@@ -25,7 +25,10 @@ class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging
connConf: GemFireConnectionConf = defaultConnectionConf,
opConf: Map[String, String] = Map.empty): Unit = {
connConf.getConnection.validateRegion[K, V](regionPath)
- logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
+ if (log.isDebugEnabled)
+ logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""")
+ else
+ logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf)
rdd.sparkContext.runJob(rdd, writer.write(func) _)
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
index bba6c69..3fcb496 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
@@ -1,5 +1,7 @@
package io.pivotal.gemfire.spark.connector.internal
+import java.net.InetAddress
+
import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut}
import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService}
import com.gemstone.gemfire.cache.query.Query
@@ -7,7 +9,7 @@ import com.gemstone.gemfire.cache.{Region, RegionService}
import com.gemstone.gemfire.internal.cache.execute.InternalExecution
import io.pivotal.gemfire.spark.connector.internal.oql.QueryResultCollector
import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition
-import org.apache.spark.Logging
+import org.apache.spark.{SparkEnv, Logging}
import io.pivotal.gemfire.spark.connector.GemFireConnection
import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions._
import java.util.{Set => JSet, List => JList }
@@ -30,10 +32,7 @@ private[connector] class DefaultGemFireConnection (
private def initClientCache() : ClientCache = {
try {
- import io.pivotal.gemfire.spark.connector.map2Properties
- logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""")
- val ccf = new ClientCacheFactory(gemFireProps)
- locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) }
+ val ccf = getClientCacheFactory
ccf.create()
} catch {
case e: Exception =>
@@ -41,6 +40,27 @@ private[connector] class DefaultGemFireConnection (
throw new RuntimeException(e)
}
}
+
+ private def getClientCacheFactory: ClientCacheFactory = {
+ import io.pivotal.gemfire.spark.connector.map2Properties
+ val ccf = new ClientCacheFactory(gemFireProps)
+ ccf.setPoolReadTimeout(30000)
+ val servers = LocatorHelper.getAllGemFireServers(locators)
+ if (servers.isDefined && servers.get.size > 0) {
+ val sparkIp = System.getenv("SPARK_LOCAL_IP")
+ val hostName = if (sparkIp != null) InetAddress.getByName(sparkIp).getCanonicalHostName
+ else InetAddress.getLocalHost.getCanonicalHostName
+ val executorId = SparkEnv.get.executorId
+ val pickedServers = LocatorHelper.pickPreferredGemFireServers(servers.get, hostName, executorId)
+ logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, host=$hostName executor=$executorId props=$gemFireProps""")
+ logDebug(s"""Init ClientCache: all-severs=${pickedServers.mkString(",")}""")
+ pickedServers.foreach{ case (host, port) => ccf.addPoolServer(host, port) }
+ } else {
+ logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""")
+ locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) }
+ }
+ ccf
+ }
/** close the clientCache */
override def close(): Unit =
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
index 550e0bc..a010c62 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
@@ -1,9 +1,17 @@
package io.pivotal.gemfire.spark.connector.internal
+import java.net.InetSocketAddress
+import java.util.{ArrayList => JArrayList}
+
+import com.gemstone.gemfire.cache.client.internal.locator.{GetAllServersResponse, GetAllServersRequest}
+import com.gemstone.gemfire.distributed.internal.ServerLocation
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient
+import org.apache.spark.Logging
+
import scala.util.{Failure, Success, Try}
-object LocatorHelper {
+object LocatorHelper extends Logging {
/** valid locator strings are: host[port] and host:port */
final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r
@@ -27,4 +35,85 @@ object LocatorHelper {
def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] =
locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get)
+
+ /**
+ * Return the list of live GemFire servers for the given locators.
+ * @param locators locators for the given GemFire cluster
+ * @param serverGroup optional server group name, default is "" (empty string)
+ */
+ def getAllGemFireServers(locators: Seq[(String, Int)], serverGroup: String = ""): Option[Seq[(String, Int)]] = {
+ var result: Option[Seq[(String, Int)]] = None
+ locators.find { case (host, port) =>
+ try {
+ val addr = new InetSocketAddress(host, port)
+ val req = new GetAllServersRequest(serverGroup)
+ val res = TcpClient.requestToServer(addr.getAddress, addr.getPort, req, 2000)
+ if (res != null) {
+ import scala.collection.JavaConverters._
+ val servers = res.asInstanceOf[GetAllServersResponse].getServers.asInstanceOf[JArrayList[ServerLocation]]
+ if (servers.size > 0)
+ result = Some(servers.asScala.map(e => (e.getHostName, e.getPort)))
+ }
+ } catch { case e: Exception => logWarning("getAllGemFireServers error", e)
+ }
+ result.isDefined
+ }
+ result
+ }
+
+ /**
+ * Pick up at most 3 preferred servers from all available servers based on
+ * host name and Spark executor id.
+ *
+ * This method is used by DefaultGemFireConnection to create LocalCache. Usually
+ * one server is enough to initialize LocalCacheFactory, but this provides two
+ * backup servers in case of the 1st server can't be connected.
+ *
+ * @param servers all available servers in the form of (hostname, port) pairs
+ * @param hostName the host name of the Spark executor
+ * @param executorId the Spark executor Id, such as "<driver>", "0", "1", ...
+ * @return Seq[(hostname, port)] of preferred servers
+ */
+ def pickPreferredGemFireServers(
+ servers: Seq[(String, Int)], hostName: String, executorId: String): Seq[(String, Int)] = {
+
+ // pick up `length` items form the Seq starts at the `start` position.
+ // The Seq is treated as a ring, so at most `Seq.size` items can be picked
+ def circularTake[T](seq: Seq[T], start: Int, length: Int): Seq[T] = {
+ val size = math.min(seq.size, length)
+ (start until start + size).map(x => seq(x % seq.size))
+ }
+
+ // map executor id to int: "<driver>" (or non-number string) to 0, and "n" to n + 1
+ val id = try { executorId.toInt + 1 } catch { case e: NumberFormatException => 0 }
+
+ // algorithm:
+ // 1. sort server list
+ // 2. split sorted server list into 3 sub-lists a, b, and c:
+ // list-a: servers on the given host
+ // list-b: servers that are in front of list-a on the sorted server list
+ // list-c: servers that are behind list-a on the sorted server list
+ // then rotate list-a based on executor id, then create new server list:
+ // modified list-a ++ list-c ++ list-b
+ // 3. if there's no server on the given host, then create new server list
+ // by rotating sorted server list based on executor id.
+ // 4. take up to 3 servers from the new server list
+ val sortedServers = servers.sorted
+ val firstIdx = sortedServers.indexWhere(p => p._1 == hostName)
+ val lastIdx = if (firstIdx < 0) -1 else sortedServers.lastIndexWhere(p => p._1 == hostName)
+
+ if (firstIdx < 0) { // no local server
+ circularTake(sortedServers, id, 3)
+ } else {
+ val (seq1, seq2) = sortedServers.splitAt(firstIdx)
+ val seq = if (firstIdx == lastIdx) { // one local server
+ seq2 ++ seq1
+ } else { // multiple local server
+ val (seq3, seq4) = seq2.splitAt(lastIdx - firstIdx + 1)
+ val seq3b = if (id % seq3.size == 0) seq3 else circularTake(seq3, id, seq3.size)
+ seq3b ++ seq4 ++ seq1
+ }
+ circularTake(seq, 0, 3)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
index cff61d6..3a987b2 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
@@ -82,7 +82,7 @@ class GemFireRegionRDD[K, V] private[connector]
logInfo(s"""RDD id=${this.id} region=$regionPath conn=${connConf.locators.mkString(",")}, env=$opConf""")
val p = if (data.isPartitioned) preferredPartitioner else defaultReplicatedRegionPartitioner
val splits = p.partitions[K, V](conn, data, opConf)
- logDebug(s"""RDD id=${this.id} region=$regionPath partitions=${splits.mkString(",")}""")
+ logDebug(s"""RDD id=${this.id} region=$regionPath partitions=\n ${splits.mkString("\n ")}""")
splits
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
index 72a5bb1..d08e96c 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
@@ -25,6 +25,8 @@ package object connector {
final val RDDSaveBatchSizePropKey = "rdd.save.batch.size"
final val RDDSaveBatchSizeDefault = 10000
+ /** implicits */
+
implicit def toSparkContextFunctions(sc: SparkContext): GemFireSparkContextFunctions =
new GemFireSparkContextFunctions(sc)
@@ -43,4 +45,9 @@ package object connector {
implicit def map2Properties(map: Map[String,String]): java.util.Properties =
(new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
+ /** internal util methods */
+
+ private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n "): String =
+ rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p loc=${rdd.preferredLocations(p)}"}.mkString(sep)
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e2a795d/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
index 508666a..de4b7a7 100644
--- a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
@@ -1,5 +1,7 @@
package unittest.io.pivotal.gemfire.spark.connector
+import java.net.InetAddress
+
import io.pivotal.gemfire.spark.connector.internal.LocatorHelper
import org.scalatest.FunSuite
@@ -72,4 +74,79 @@ class LocatorHelperTest extends FunSuite {
intercept[Exception] { LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") }
}
+ test("pickPreferredGemFireServers: shared servers and one gf-server per host") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
+ val servers = Seq(srv1, srv2, srv3, srv4)
+ verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4))
+ verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2))
+ }
+
+ test("pickPreferredGemFireServers: shared servers, one gf-server per host, un-sorted list") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
+ val servers = Seq(srv4, srv2, srv3, srv1)
+ verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4))
+ verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2))
+ }
+
+ test("pickPreferredGemFireServers: shared servers and two gf-server per host") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004))
+ val servers = Seq(srv1, srv2, srv3, srv4)
+ verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3))
+ verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1))
+ }
+
+ test("pickPreferredGemFireServers: shared servers, two gf-server per host, un-sorted server list") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004))
+ val servers = Seq(srv1, srv4, srv3, srv2)
+ verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3))
+ verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1))
+ }
+
+ test("pickPreferredGemFireServers: no shared servers and one gf-server per host") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
+ val servers = Seq(srv1, srv2, srv3, srv4)
+ verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv3, srv4))
+ verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv4, srv1, srv2))
+ }
+
+ test("pickPreferredGemFireServers: no shared servers, one gf-server per host, and less gf-server") {
+ val (srv1, srv2) = (("host1", 4001), ("host2", 4002))
+ val servers = Seq(srv1, srv2)
+ verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2))
+ verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv1))
+ verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv1, srv2))
+ verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv2, srv1))
+
+
+ println("host name: " + InetAddress.getLocalHost.getHostName)
+ println("canonical host name: " + InetAddress.getLocalHost.getCanonicalHostName)
+ println("canonical host name 2: " + InetAddress.getByName(InetAddress.getLocalHost.getHostName).getCanonicalHostName)
+ }
+
+ test("pickPreferredGemFireServers: ad-hoc") {
+ val (srv4, srv5, srv6) = (
+ ("w2-gst-pnq-04.gemstone.com", 40411), ("w2-gst-pnq-05.gemstone.com", 40411), ("w2-gst-pnq-06.gemstone.com", 40411))
+ val servers = Seq(srv6, srv5, srv4)
+ verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-03.gemstone.com", "<driver>", Seq(srv4, srv5, srv6))
+ verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-04.gemstone.com", "1", Seq(srv4, srv5, srv6))
+ verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-05.gemstone.com", "0", Seq(srv5, srv6, srv4))
+ verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-06.gemstone.com", "2", Seq(srv6, srv4, srv5))
+ }
+
+ def verifyPickPreferredGemFireServers(
+ servers: Seq[(String, Int)], hostName: String, executorId: String, expectation: Seq[(String, Int)]): Unit = {
+ val result = LocatorHelper.pickPreferredGemFireServers(servers, hostName, executorId)
+ assert(result == expectation, s"pick servers for $hostName:$executorId")
+ }
+
}