You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/04/21 00:51:45 UTC
[02/11] incubator-geode git commit: GEODE-1244: Package, directory,
project and file rename for geode-spark-connector
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
new file mode 100644
index 0000000..798912c
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+
+class DefaultGeodeConnectionManagerTest extends FunSuite with Matchers with MockitoSugar {
+
+ test("DefaultGeodeConnectionFactory get/closeConnection") {
+ // note: connConf 1-4 share the same set of locators
+ val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
+ val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
+ val connConf3 = new GeodeConnectionConf(Seq(("host1", 1234), ("host2", 5678)))
+ val connConf4 = new GeodeConnectionConf(Seq(("host2", 5678), ("host1", 1234)))
+ val connConf5 = new GeodeConnectionConf(Seq(("host5", 3333)))
+
+ val props: Map[String, String] = Map.empty
+ val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
+ val mockConn1 = mock[DefaultGeodeConnection]
+ val mockConn2 = mock[DefaultGeodeConnection]
+ when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1)
+ when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2)
+
+ assert(DefaultGeodeConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1)
+ // note: following 3 lines do not trigger connFactory.newConnection(...)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2)
+
+ // connFactory.newConnection(...) were invoked only twice
+ verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props)
+ verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props)
+ assert(DefaultGeodeConnectionManager.connections.size == 3)
+
+ DefaultGeodeConnectionManager.closeConnection(connConf1)
+ assert(DefaultGeodeConnectionManager.connections.size == 1)
+ DefaultGeodeConnectionManager.closeConnection(connConf5)
+ assert(DefaultGeodeConnectionManager.connections.isEmpty)
+ }
+
+ test("DefaultGeodeConnectionFactory newConnection(...) throws RuntimeException") {
+ val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
+ val props: Map[String, String] = Map.empty
+ val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
+ when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException())
+ intercept[RuntimeException] { DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) }
+ verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props)
+ }
+
+ test("DefaultGeodeConnectionFactory close() w/ non-exist connection") {
+ val props: Map[String, String] = Map.empty
+ val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
+ val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
+ val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
+ val mockConn1 = mock[DefaultGeodeConnection]
+ when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
+ assert(DefaultGeodeConnectionManager.connections.size == 1)
+ // connection does not exists in the connection manager
+ DefaultGeodeConnectionManager.closeConnection(connConf2)
+ assert(DefaultGeodeConnectionManager.connections.size == 1)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
new file mode 100644
index 0000000..f2303e7
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions
+
+import com.gemstone.gemfire.DataSerializer
+import com.gemstone.gemfire.cache.execute.{ResultCollector, ResultSender}
+import com.gemstone.gemfire.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl}
+import com.gemstone.gemfire.cache.query.types.ObjectType
+import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput, HeapDataOutputStream}
+import com.gemstone.gemfire.internal.cache.{CachedDeserializable, CachedDeserializableFactory}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import scala.collection.JavaConversions._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter {
+
+ /**
+ * A test ResultSender that connects struct ResultSender and ResultCollector
+ * Note: this test ResultSender has to copy the data (byte array) since the
+ * StructStreamingResultSender will reuse the byte array.
+ */
+ class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] {
+
+ var finishedNum = 0
+
+ override def sendResult(result: Object): Unit =
+ collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
+
+ /** exception should be sent via lastResult() */
+ override def sendException(throwable: Throwable): Unit =
+ throw new UnsupportedOperationException("sendException is not supported.")
+
+ override def lastResult(result: Object): Unit = {
+ collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
+ this.synchronized {
+ finishedNum += 1
+ if (finishedNum == num)
+ collector.endResults()
+ }
+ }
+ }
+
+ /** common variables */
+ var collector: StructStreamingResultCollector = _
+ var baseSender: LocalResultSender = _
+ /** common types */
+ val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType]
+ val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType))
+ val OneColType = new StructTypeImpl(Array("value"), Array(objType))
+
+ before {
+ collector = new StructStreamingResultCollector
+ baseSender = new LocalResultSender(collector, 1)
+ }
+
+ test("transfer simple data") {
+ verifySimpleTransfer(sendDataType = true)
+ }
+
+ test("transfer simple data with no type info") {
+ verifySimpleTransfer(sendDataType = false)
+ }
+
+ def verifySimpleTransfer(sendDataType: Boolean): Unit = {
+ val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator
+ val dataType = if (sendDataType) TwoColType else null
+ new StructStreamingResultSender(baseSender, dataType , iter).send()
+ // println("type: " + collector.getResultType.toString)
+ assert(TwoColType.equals(collector.getResultType))
+ val iter2 = collector.getResult
+ (0 to 9).foreach { i =>
+ assert(iter2.hasNext)
+ val o = iter2.next()
+ assert(o.size == 2)
+ assert(o(0).asInstanceOf[Int] == i)
+ assert(o(1).asInstanceOf[String] == i.toString * 5)
+ }
+ assert(! iter2.hasNext)
+ }
+
+
+ /**
+ * A test iterator that generate integer data
+ * @param start the 1st value
+ * @param n number of integers generated
+ * @param genExcp generate Exception if true. This is used to test exception handling.
+ */
+ def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = {
+ new Iterator[Array[Object]] {
+ val max = if (genExcp) start + n else start + n - 1
+ var index: Int = start - 1
+
+ override def hasNext: Boolean = if (index < max) true else false
+
+ override def next(): Array[Object] =
+ if (index < (start + n - 1)) {
+ index += 1
+ Array(index.asInstanceOf[Object])
+ } else throw new RuntimeException("simulated error")
+ }
+ }
+
+ test("transfer data with 0 row") {
+ new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send()
+ // println("type: " + collector.getResultType.toString)
+ assert(collector.getResultType == null)
+ val iter = collector.getResult
+ assert(! iter.hasNext)
+ }
+
+ test("transfer data with 10K rows") {
+ new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send()
+ // println("type: " + collector.getResultType.toString)
+ assert(OneColType.equals(collector.getResultType))
+ val iter = collector.getResult
+ // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+ (1 to 10000).foreach { i =>
+ assert(iter.hasNext)
+ val o = iter.next()
+ assert(o.size == 1)
+ assert(o(0).asInstanceOf[Int] == i)
+ }
+ assert(! iter.hasNext)
+ }
+
+ test("transfer data with 10K rows with 2 sender") {
+ baseSender = new LocalResultSender(collector, 2)
+ val total = 300
+ val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
+ val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()}
+ Await.result(sender1, 1.seconds)
+ Await.result(sender2, 1.seconds)
+
+ // println("type: " + collector.getResultType.toString)
+ assert(OneColType.equals(collector.getResultType))
+ val iter = collector.getResult
+ // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+ val set = scala.collection.mutable.Set[Int]()
+ (1 to total).foreach { i =>
+ assert(iter.hasNext)
+ val o = iter.next()
+ assert(o.size == 1)
+ assert(! set.contains(o(0).asInstanceOf[Int]))
+ set.add(o(0).asInstanceOf[Int])
+ }
+ assert(! iter.hasNext)
+ }
+
+ test("transfer data with 10K rows with 2 sender with error") {
+ baseSender = new LocalResultSender(collector, 2)
+ val total = 1000
+ val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
+ val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()}
+ Await.result(sender1, 1 seconds)
+ Await.result(sender2, 1 seconds)
+
+ // println("type: " + collector.getResultType.toString)
+ assert(OneColType.equals(collector.getResultType))
+ val iter = collector.getResult
+ // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+ val set = scala.collection.mutable.Set[Int]()
+ intercept[RuntimeException] {
+ (1 to total).foreach { i =>
+ assert(iter.hasNext)
+ val o = iter.next()
+ assert(o.size == 1)
+ assert(! set.contains(o(0).asInstanceOf[Int]))
+ set.add(o(0).asInstanceOf[Int])
+ }
+ }
+ // println(s"rows received: ${set.size}")
+ }
+
+ test("transfer data with Exception") {
+ new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send()
+ // println("type: " + collector.getResultType.toString)
+ val iter = collector.getResult
+ intercept[RuntimeException] ( iter.foreach(_.mkString(",")) )
+ }
+
+ def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] =
+ intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}"))
+
+ test("transfer string pair data with 200 rows") {
+ new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send()
+ // println("type: " + collector.getResultType.toString)
+ assert(TwoColType.equals(collector.getResultType))
+ val iter = collector.getResult
+ // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+ (1 to 1000).foreach { i =>
+ assert(iter.hasNext)
+ val o = iter.next()
+ assert(o.size == 2)
+ assert(o(0) == s"key-$i")
+ assert(o(1) == s"value-$i")
+ }
+ assert(! iter.hasNext)
+ }
+
+ /**
+ * Usage notes: There are 3 kinds of data to transfer:
+ * (1) object, (2) byte array of serialized object, and (3) byte array
+ * this test shows how to handle all of them.
+ */
+ test("DataSerializer usage") {
+ val outBuf = new HeapDataOutputStream(1024, null)
+ val inBuf = new ByteArrayDataInput()
+
+ // 1. a regular object
+ val hello = "Hello World!" * 30
+ // serialize the data
+ DataSerializer.writeObject(hello, outBuf)
+ val bytesHello = outBuf.toByteArray.clone()
+ // de-serialize the data
+ inBuf.initialize(bytesHello, Version.CURRENT)
+ val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
+ assert(hello == hello2)
+
+ // 2. byte array of serialized object
+ // serialize: byte array from `CachedDeserializable`
+ val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello)
+ outBuf.reset()
+ DataSerializer.writeByteArray(cd.getSerializedValue, outBuf)
+ // de-serialize the data in 2 steps
+ inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
+ val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf)
+ inBuf.initialize(bytesHello2, Version.CURRENT)
+ val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
+ assert(hello == hello3)
+
+ // 3. byte array
+ outBuf.reset()
+ DataSerializer.writeByteArray(bytesHello, outBuf)
+ inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
+ val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf)
+ assert(bytesHello sameElements bytesHello3)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
new file mode 100644
index 0000000..54394e8
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import org.scalatest.FunSuite
+
+class QueryParserTest extends FunSuite {
+
+ test("select * from /r1") {
+ val r = QueryParser.parseOQL("select * from /r1").get
+ assert(r == "List(/r1)")
+ }
+
+ test("select c2 from /r1") {
+ val r = QueryParser.parseOQL("select c2 from /r1").get
+ assert(r == "List(/r1)")
+ }
+
+ test("select key, value from /r1.entries") {
+ val r = QueryParser.parseOQL("select key, value from /r1.entries").get
+ assert(r == "List(/r1.entries)")
+ }
+
+ test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") {
+ val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get
+ assert(r == "List(/r1)")
+ }
+
+ test("select * from /r1/r2 where c1 >= 200") {
+ val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get
+ assert(r == "List(/r1/r2)")
+ }
+
+ test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") {
+ val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get
+ assert(r == "List(/r1/r2, /r3/r4)")
+ }
+
+ test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") {
+ val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get
+ assert(r == "List(/r1/r2)")
+ }
+
+ test("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") {
+ val r = QueryParser.parseOQL("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get
+ assert(r == "List(/root/sub.entries)")
+ }
+
+ test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") {
+ val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get
+ assert(r == "List(/region)")
+ }
+
+ test("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID") {
+ val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID").get
+ assert(r == "List(/QueryRegion1, /QueryRegion2)")
+ }
+
+ test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") {
+ val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get
+ println("r.type=" + r.getClass.getName + " r=" + r)
+ assert(r == "List(/obj_obj_region)")
+ }
+
+ test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") {
+ val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get
+ assert(r == "List(/obj_obj_region, r.positions.values)")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
new file mode 100644
index 0000000..b0464cc
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector._
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.Matchers
+
+class ConnectorImplicitsTest extends FunSuite with Matchers with MockitoSugar {
+
+ test("implicit map2Properties") {
+ verifyProperties(Map.empty)
+ verifyProperties(Map("One" -> "1", "Two" -> "2", "Three" ->"3"))
+ }
+
+ def verifyProperties(map: Map[String, String]): Unit = {
+ val props: java.util.Properties = map
+ assert(props.size() == map.size)
+ map.foreach(p => assert(props.getProperty(p._1) == p._2))
+ }
+
+ test("Test Implicit SparkContext Conversion") {
+ val mockSparkContext = mock[SparkContext]
+ val gfscf: GeodeSparkContextFunctions = mockSparkContext
+ assert(gfscf.isInstanceOf[GeodeSparkContextFunctions])
+ }
+
+ test("Test Implicit SQLContext Conversion") {
+ val mockSQLContext = mock[SQLContext]
+ val gfscf: GeodeSQLContextFunctions = mockSQLContext
+ assert(gfscf.isInstanceOf[GeodeSQLContextFunctions])
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
new file mode 100644
index 0000000..a3076f4
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import org.apache.spark.SparkConf
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSuite}
+import io.pivotal.geode.spark.connector._
+
+class GeodeConnectionConfTest extends FunSuite with Matchers with MockitoSugar {
+
+ test("apply(SparkConf) w/ GeodeLocator property and empty geodeProps") {
+ val (host1, port1) = ("host1", 1234)
+ val (host2, port2) = ("host2", 5678)
+ val conf = new SparkConf().set(GeodeLocatorPropKey, s"$host1[$port1],$host2[$port2]")
+ val connConf = GeodeConnectionConf(conf)
+ assert(connConf.locators == Seq((host1, port1),(host2, port2)))
+ assert(connConf.geodeProps.isEmpty)
+ }
+
+ test("apply(SparkConf) w/ GeodeLocator property and geode properties") {
+ val (host1, port1) = ("host1", 1234)
+ val (host2, port2) = ("host2", 5678)
+ val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
+ val (propK2, propV2) = ("ack-wait-threshold", "10")
+ val conf = new SparkConf().set(GeodeLocatorPropKey, s"$host1[$port1],$host2[$port2]")
+ .set(s"spark.geode.$propK1", propV1).set(s"spark.geode.$propK2", propV2)
+ val connConf = GeodeConnectionConf(conf)
+ assert(connConf.locators == Seq((host1, port1),(host2, port2)))
+ assert(connConf.geodeProps == Map(propK1 -> propV1, propK2 -> propV2))
+ }
+
+ test("apply(SparkConf) w/o GeodeLocator property") {
+ intercept[RuntimeException] { GeodeConnectionConf(new SparkConf()) }
+ }
+
+ test("apply(SparkConf) w/ invalid GeodeLocator property") {
+ val conf = new SparkConf().set(GeodeLocatorPropKey, "local^host:1234")
+ intercept[Exception] { GeodeConnectionConf(conf) }
+ }
+
+ test("apply(locatorStr, geodeProps) w/ valid locatorStr and non geodeProps") {
+ val (host1, port1) = ("host1", 1234)
+ val connConf = GeodeConnectionConf(s"$host1:$port1")
+ assert(connConf.locators == Seq((host1, port1)))
+ assert(connConf.geodeProps.isEmpty)
+ }
+
+ test("apply(locatorStr, geodeProps) w/ valid locatorStr and non-empty geodeProps") {
+ val (host1, port1) = ("host1", 1234)
+ val (host2, port2) = ("host2", 5678)
+ val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
+ val (propK2, propV2) = ("ack-wait-threshold", "10")
+ val props = Map(propK1 -> propV1, propK2 -> propV2)
+ val connConf = GeodeConnectionConf(s"$host1:$port1,$host2:$port2", props)
+ assert(connConf.locators == Seq((host1, port1),(host2, port2)))
+ assert(connConf.geodeProps == props)
+ }
+
+ test("apply(locatorStr, geodeProps) w/ invalid locatorStr") {
+ intercept[Exception] { GeodeConnectionConf("local~host:4321") }
+ }
+
+ test("constructor w/ empty (host,port) pairs") {
+ intercept[IllegalArgumentException] { new GeodeConnectionConf(Seq.empty) }
+ }
+
+ test("getConnection() normal") {
+ implicit val mockFactory = mock[GeodeConnectionManager]
+ val mockConnection = mock[GeodeConnection]
+ when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenReturn(mockConnection)
+ val connConf = GeodeConnectionConf("localhost:1234")
+ assert(connConf.getConnection == mockConnection)
+ verify(mockFactory).getConnection(connConf)
+ }
+
+ test("getConnection() failure") {
+ implicit val mockFactory = mock[GeodeConnectionManager]
+ when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenThrow(new RuntimeException)
+ val connConf = GeodeConnectionConf("localhost:1234")
+ intercept[RuntimeException] { connConf.getConnection }
+ verify(mockFactory).getConnection(connConf)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
new file mode 100644
index 0000000..bcba7e1
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSuite}
+import org.mockito.Matchers.{eq => mockEq, any => mockAny}
+
+import scala.reflect.ClassTag
+
+class GeodeDStreamFunctionsTest extends FunSuite with Matchers with MockitoSugar {
+
+ test("test GeodePairDStreamFunctions Implicit") {
+ import io.pivotal.geode.spark.connector.streaming._
+ val mockDStream = mock[DStream[(Int, String)]]
+ // the implicit make the following line valid
+ val pairDStream: GeodePairDStreamFunctions[Int, String] = mockDStream
+ pairDStream shouldBe a[GeodePairDStreamFunctions[_, _]]
+ }
+
+ test("test GeodeDStreamFunctions Implicit") {
+ import io.pivotal.geode.spark.connector.streaming._
+ val mockDStream = mock[DStream[String]]
+ // the implicit make the following line valid
+ val dstream: GeodeDStreamFunctions[String] = mockDStream
+ dstream shouldBe a[GeodeDStreamFunctions[_]]
+ }
+
+ def createMocks[K, V](regionPath: String)
+ (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]])
+ : (String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = {
+ val mockConnection = mock[GeodeConnection]
+ val mockConnConf = mock[GeodeConnectionConf]
+ val mockRegion = mock[Region[K, V]]
+ when(mockConnConf.getConnection).thenReturn(mockConnection)
+ when(mockConnConf.locators).thenReturn(Seq.empty)
+ (regionPath, mockConnConf, mockConnection, mockRegion)
+ }
+
+ test("test GeodePairDStreamFunctions.saveToGeode()") {
+ import io.pivotal.geode.spark.connector.streaming._
+ val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test")
+ val mockDStream = mock[DStream[(String, String)]]
+ mockDStream.saveToGeode(regionPath, mockConnConf)
+ verify(mockConnConf).getConnection
+ verify(mockConnection).validateRegion[String, String](regionPath)
+ verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit])
+ }
+
+ test("test GeodeDStreamFunctions.saveToGeode()") {
+ import io.pivotal.geode.spark.connector.streaming._
+ val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, Int]("test")
+ val mockDStream = mock[DStream[String]]
+ mockDStream.saveToGeode[String, Int](regionPath, (s: String) => (s, s.length), mockConnConf)
+ verify(mockConnConf).getConnection
+ verify(mockConnection).validateRegion[String, String](regionPath)
+ verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit])
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
new file mode 100644
index 0000000..96e5f26
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector._
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDWriter, GeodePairRDDWriter}
+import org.apache.spark.{TaskContext, SparkContext}
+import org.apache.spark.rdd.RDD
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+import collection.JavaConversions._
+import scala.reflect.ClassTag
+import org.mockito.Matchers.{eq => mockEq, any => mockAny}
+
+class GeodeRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar {
+
+ test("test PairRDDFunction Implicit") {
+ import io.pivotal.geode.spark.connector._
+ val mockRDD = mock[RDD[(Int, String)]]
+ // the implicit make the following line valid
+ val pairRDD: GeodePairRDDFunctions[Int, String] = mockRDD
+ pairRDD shouldBe a [GeodePairRDDFunctions[_, _]]
+ }
+
+ test("test RDDFunction Implicit") {
+ import io.pivotal.geode.spark.connector._
+ val mockRDD = mock[RDD[String]]
+ // the implicit make the following line valid
+ val nonPairRDD: GeodeRDDFunctions[String] = mockRDD
+ nonPairRDD shouldBe a [GeodeRDDFunctions[_]]
+ }
+
+ def createMocks[K, V](regionPath: String)
+ (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]): (String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = {
+ val mockConnection = mock[GeodeConnection]
+ val mockConnConf = mock[GeodeConnectionConf]
+ val mockRegion = mock[Region[K, V]]
+ when(mockConnConf.getConnection).thenReturn(mockConnection)
+ when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion)
+ // mockRegion shouldEqual mockConn.getRegionProxy[K, V](regionPath)
+ (regionPath, mockConnConf, mockConnection, mockRegion)
+ }
+
+ test("test GeodePairRDDWriter") {
+ val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test")
+ val writer = new GeodePairRDDWriter[String, String](regionPath, mockConnConf)
+ val data = List(("1", "one"), ("2", "two"), ("3", "three"))
+ writer.write(null, data.toIterator)
+ val expectedMap: Map[String, String] = data.toMap
+ verify(mockRegion).putAll(expectedMap)
+ }
+
+ test("test GeodeNonPairRDDWriter") {
+ val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test")
+ val writer = new GeodeRDDWriter[String, Int, String](regionPath, mockConnConf)
+ val data = List("a", "ab", "abc")
+ val f: String => (Int, String) = s => (s.length, s)
+ writer.write(f)(null, data.toIterator)
+ val expectedMap: Map[Int, String] = data.map(f).toMap
+ verify(mockRegion).putAll(expectedMap)
+ }
+
+ test("test PairRDDFunctions.saveToGeode") {
+ verifyPairRDDFunction(useOpConf = false)
+ }
+
+ test("test PairRDDFunctions.saveToGeode w/ opConf") {
+ verifyPairRDDFunction(useOpConf = true)
+ }
+
+ def verifyPairRDDFunction(useOpConf: Boolean): Unit = {
+ import io.pivotal.geode.spark.connector._
+ val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test")
+ val mockRDD = mock[RDD[(String, String)]]
+ val mockSparkContext = mock[SparkContext]
+ when(mockRDD.sparkContext).thenReturn(mockSparkContext)
+ val result =
+ if (useOpConf)
+ mockRDD.saveToGeode(regionPath, mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000"))
+ else
+ mockRDD.saveToGeode(regionPath, mockConnConf)
+ verify(mockConnection, times(1)).validateRegion[String, String](regionPath)
+ result === Unit
+ verify(mockSparkContext, times(1)).runJob[(String, String), Unit](
+ mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => Unit])(mockAny(classOf[ClassTag[Unit]]))
+
+ // Note: current implementation make following code not compilable
+ // so not negative test for this case
+ // val rdd: RDD[(K, V)] = ...
+ // rdd.saveToGeode(regionPath, s => (s.length, s))
+ }
+
+ test("test RDDFunctions.saveToGeode") {
+ verifyRDDFunction(useOpConf = false)
+ }
+
+ test("test RDDFunctions.saveToGeode w/ opConf") {
+ verifyRDDFunction(useOpConf = true)
+ }
+
+ def verifyRDDFunction(useOpConf: Boolean): Unit = {
+ import io.pivotal.geode.spark.connector._
+ val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test")
+ val mockRDD = mock[RDD[(String)]]
+ val mockSparkContext = mock[SparkContext]
+ when(mockRDD.sparkContext).thenReturn(mockSparkContext)
+ val result =
+ if (useOpConf)
+ mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000"))
+ else
+ mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf)
+ verify(mockConnection, times(1)).validateRegion[Int, String](regionPath)
+ result === Unit
+ verify(mockSparkContext, times(1)).runJob[String, Unit](
+ mockEq(mockRDD), mockAny[(TaskContext, Iterator[String]) => Unit])(mockAny(classOf[ClassTag[Unit]]))
+
+ // Note: current implementation make following code not compilable
+ // so not negative test for this case
+ // val rdd: RDD[T] = ... // T is not a (K, V) tuple
+ // rdd.saveToGeode(regionPath)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala
new file mode 100644
index 0000000..c775784
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import java.net.InetAddress
+
+import io.pivotal.geode.spark.connector.internal.LocatorHelper
+import org.scalatest.FunSuite
+
+class LocatorHelperTest extends FunSuite {
+
+ test("locatorStr2HostPortPair hostname w/o domain") {
+ val (host, port) = ("localhost", 10334)
+ assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port))
+ assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port))
+ }
+
+ test("locatorStr2HostPortPair hostname w/ domain") {
+ val (host, port) = ("localhost", 10334)
+ assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port))
+ assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port))
+ }
+
+ test("locatorStr2HostPortPair w/ invalid host name") {
+ // empty or null locatorStr
+ assert(LocatorHelper.locatorStr2HostPortPair("").isFailure)
+ assert(LocatorHelper.locatorStr2HostPortPair(null).isFailure)
+ // host name has leading `.`
+ assert(LocatorHelper.locatorStr2HostPortPair(".localhost.1234").isFailure)
+ // host name has leading and/or tail white space
+ assert(LocatorHelper.locatorStr2HostPortPair(" localhost.1234").isFailure)
+ assert(LocatorHelper.locatorStr2HostPortPair("localhost .1234").isFailure)
+ assert(LocatorHelper.locatorStr2HostPortPair(" localhost .1234").isFailure)
+ // host name contain invalid characters
+ assert(LocatorHelper.locatorStr2HostPortPair("local%host.1234").isFailure)
+ assert(LocatorHelper.locatorStr2HostPortPair("localhost*.1234").isFailure)
+ assert(LocatorHelper.locatorStr2HostPortPair("^localhost.1234").isFailure)
+ }
+
+ test("locatorStr2HostPortPair w/ valid port") {
+ val host = "192.168.0.1"
+ // port has 2, 3, 4, 5 digits
+ assert(LocatorHelper.locatorStr2HostPortPair(s"$host:20").get ==(host, 20))
+ assert(LocatorHelper.locatorStr2HostPortPair(s"$host:300").get ==(host, 300))
+ assert(LocatorHelper.locatorStr2HostPortPair(s"$host:4000").get ==(host, 4000))
+ assert(LocatorHelper.locatorStr2HostPortPair(s"$host:50000").get ==(host, 50000))
+ }
+
+ test("locatorStr2HostPortPair w/ invalid port") {
+ // port number is less than 2 digits
+ assert(LocatorHelper.locatorStr2HostPortPair("locslhost.9").isFailure)
+ // port number is more than 5 digits
+ assert(LocatorHelper.locatorStr2HostPortPair("locslhost.100000").isFailure)
+ // port number is invalid
+ assert(LocatorHelper.locatorStr2HostPortPair("locslhost.1xx1").isFailure)
+ }
+
+ test("parseLocatorsString with valid locator(s)") {
+ val (host1, port1) = ("localhost", 10334)
+ assert(LocatorHelper.parseLocatorsString(s"$host1:$port1") == Seq((host1, port1)))
+ val (host2, port2) = ("localhost2", 10335)
+ assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2") == Seq((host1, port1),(host2, port2)))
+ val (host3, port3) = ("localhost2", 10336)
+ assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2,$host3:$port3") ==
+ Seq((host1, port1),(host2, port2),(host3, port3)))
+ }
+
+ test("parseLocatorsString with invalid locator(s)") {
+ // empty and null locatorsStr
+ intercept[Exception] { LocatorHelper.parseLocatorsString("") }
+ intercept[Exception] { LocatorHelper.parseLocatorsString(null) }
+ // 1 bad locatorStr
+ intercept[Exception] { LocatorHelper.parseLocatorsString("local%host.1234") }
+ // 1 good locatorStr and 1 bad locatorStr
+ intercept[Exception] { LocatorHelper.parseLocatorsString("localhost:2345,local%host.1234") }
+ intercept[Exception] { LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") }
+ }
+
+ test("pickPreferredGeodeServers: shared servers and one gf-server per host") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
+ val servers = Seq(srv1, srv2, srv3, srv4)
+ verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGeodeServers(servers, "host2", "0", Seq(srv2, srv3, srv4))
+ verifyPickPreferredGeodeServers(servers, "host3", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGeodeServers(servers, "host4", "2", Seq(srv4, srv1, srv2))
+ }
+
+ test("pickPreferredGeodeServers: shared servers, one gf-server per host, un-sorted list") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
+ val servers = Seq(srv4, srv2, srv3, srv1)
+ verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGeodeServers(servers, "host2", "0", Seq(srv2, srv3, srv4))
+ verifyPickPreferredGeodeServers(servers, "host3", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGeodeServers(servers, "host4", "2", Seq(srv4, srv1, srv2))
+ }
+
+ test("pickPreferredGeodeServers: shared servers and two gf-server per host") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004))
+ val servers = Seq(srv1, srv2, srv3, srv4)
+ verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGeodeServers(servers, "host1", "0", Seq(srv2, srv1, srv3))
+ verifyPickPreferredGeodeServers(servers, "host2", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGeodeServers(servers, "host2", "2", Seq(srv4, srv3, srv1))
+ }
+
+ test("pickPreferredGeodeServers: shared servers, two gf-server per host, un-sorted server list") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004))
+ val servers = Seq(srv1, srv4, srv3, srv2)
+ verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGeodeServers(servers, "host1", "0", Seq(srv2, srv1, srv3))
+ verifyPickPreferredGeodeServers(servers, "host2", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGeodeServers(servers, "host2", "2", Seq(srv4, srv3, srv1))
+ }
+
+ test("pickPreferredGeodeServers: no shared servers and one gf-server per host") {
+ val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
+ val servers = Seq(srv1, srv2, srv3, srv4)
+ verifyPickPreferredGeodeServers(servers, "host5", "<driver>", Seq(srv1, srv2, srv3))
+ verifyPickPreferredGeodeServers(servers, "host6", "0", Seq(srv2, srv3, srv4))
+ verifyPickPreferredGeodeServers(servers, "host7", "1", Seq(srv3, srv4, srv1))
+ verifyPickPreferredGeodeServers(servers, "host8", "2", Seq(srv4, srv1, srv2))
+ }
+
+ test("pickPreferredGeodeServers: no shared servers, one gf-server per host, and less gf-server") {
+ val (srv1, srv2) = (("host1", 4001), ("host2", 4002))
+ val servers = Seq(srv1, srv2)
+ verifyPickPreferredGeodeServers(servers, "host5", "<driver>", Seq(srv1, srv2))
+ verifyPickPreferredGeodeServers(servers, "host6", "0", Seq(srv2, srv1))
+ verifyPickPreferredGeodeServers(servers, "host7", "1", Seq(srv1, srv2))
+ verifyPickPreferredGeodeServers(servers, "host8", "2", Seq(srv2, srv1))
+
+
+ println("host name: " + InetAddress.getLocalHost.getHostName)
+ println("canonical host name: " + InetAddress.getLocalHost.getCanonicalHostName)
+ println("canonical host name 2: " + InetAddress.getByName(InetAddress.getLocalHost.getHostName).getCanonicalHostName)
+ }
+
+ test("pickPreferredGeodeServers: ad-hoc") {
+ val (srv4, srv5, srv6) = (
+ ("w2-gst-pnq-04.gemstone.com", 40411), ("w2-gst-pnq-05.gemstone.com", 40411), ("w2-gst-pnq-06.gemstone.com", 40411))
+ val servers = Seq(srv6, srv5, srv4)
+ verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-03.gemstone.com", "<driver>", Seq(srv4, srv5, srv6))
+ verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-04.gemstone.com", "1", Seq(srv4, srv5, srv6))
+ verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-05.gemstone.com", "0", Seq(srv5, srv6, srv4))
+ verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-06.gemstone.com", "2", Seq(srv6, srv4, srv5))
+ }
+
+ def verifyPickPreferredGeodeServers(
+ servers: Seq[(String, Int)], hostName: String, executorId: String, expectation: Seq[(String, Int)]): Unit = {
+ val result = LocatorHelper.pickPreferredGeodeServers(servers, hostName, executorId)
+ assert(result == expectation, s"pick servers for $hostName:$executorId")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
new file mode 100644
index 0000000..f53c178
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector.rdd
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation
+import io.pivotal.geode.spark.connector.internal.RegionMetadata
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._
+import io.pivotal.geode.spark.connector.GeodeConnection
+import io.pivotal.geode.spark.connector.internal.rdd._
+import org.apache.spark.Partition
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSuite}
+
+import java.util.{HashSet => JHashSet, HashMap => JHashMap}
+
+import scala.collection.mutable
+
+class GeodeRDDPartitionerTest extends FunSuite with Matchers with MockitoSugar {
+
+ val emptyServerBucketMap: JHashMap[ServerLocation, JHashSet[Integer]] = new JHashMap()
+
+ def toJavaServerBucketMap(map: Map[(String, Int), Set[Int]]): JHashMap[ServerLocation, JHashSet[Integer]] = {
+ import scala.collection.JavaConversions._
+ val tmp = map.map {case ((host, port), set) => (new ServerLocation(host, port), set.map(Integer.valueOf))}
+ (new JHashMap[ServerLocation, JHashSet[Integer]]() /: tmp) { case (acc, (s, jset)) => acc.put(s, new JHashSet(jset)); acc }
+ }
+
+ val map: mutable.Map[(String, Int), mutable.Set[Int]] = mutable.Map(
+ ("s0",1) -> mutable.Set.empty, ("s1",2) -> mutable.Set(0), ("s2",3) -> mutable.Set(1, 2), ("s3",4) -> mutable.Set(3, 4, 5))
+
+
+ // update this test whenever change default setting
+ test("default partitioned region partitioner") {
+ assert(GeodeRDDPartitioner.defaultPartitionedRegionPartitioner === ServerSplitsPartitioner)
+ }
+
+ // update this test whenever change default setting
+ test("default replicated region partitioner") {
+ assert(GeodeRDDPartitioner.defaultReplicatedRegionPartitioner === OnePartitionPartitioner)
+ }
+
+ test("GeodeRDDPartitioner.apply method") {
+ import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._
+ for ((name, partitioner) <- partitioners) assert(GeodeRDDPartitioner(name) == partitioner)
+ assert(GeodeRDDPartitioner("dummy") == GeodeRDDPartitioner.defaultPartitionedRegionPartitioner)
+ assert(GeodeRDDPartitioner() == GeodeRDDPartitioner.defaultPartitionedRegionPartitioner)
+ }
+
+ test("OnePartitionPartitioner") {
+ val mockConnection = mock[GeodeConnection]
+ val partitions = OnePartitionPartitioner.partitions[String, String](mockConnection, null, Map.empty)
+ verifySinglePartition(partitions)
+ }
+
+ def verifySinglePartition(partitions: Array[Partition]): Unit = {
+ assert(1 == partitions.size)
+ assert(partitions(0).index === 0)
+ assert(partitions(0).isInstanceOf[GeodeRDDPartition])
+ assert(partitions(0).asInstanceOf[GeodeRDDPartition].bucketSet.isEmpty)
+ }
+
+ test("ServerSplitsPartitioner.doPartitions(): n=1 & no empty bucket") {
+ val map: List[(String, mutable.Set[Int])] = List(
+ "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
+ val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 1)
+ verifyPartitions(partitions, List(
+ (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5), Seq("server2"))))
+ }
+
+ test("ServerSplitsPartitioner.doPartitions(): n=1 & 1 empty bucket") {
+ val map: List[(String, mutable.Set[Int])] = List(
+ "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
+ val partitions = ServerSplitsPartitioner.doPartitions(map, 7, 1)
+ verifyPartitions(partitions, List(
+ (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5), Seq("server2"))))
+ }
+
+ test("ServerSplitsPartitioner.doPartitions(): n=1 & 2 empty bucket") {
+ val map: List[(String, mutable.Set[Int])] = List(
+ "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
+ val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
+ verifyPartitions(partitions, List(
+ (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5, 7), Seq("server2"))))
+ }
+
+ test("ServerSplitsPartitioner.doPartitions(): n=1 & 5 empty bucket") {
+ val map: List[(String, mutable.Set[Int])] = List(
+ "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
+ val partitions = ServerSplitsPartitioner.doPartitions(map, 11, 1)
+ verifyPartitions(partitions, List(
+ (Set(0, 1, 2, 3, 6, 7, 8), Seq("server1")), (Set(4, 5, 9, 10), Seq("server2"))))
+ }
+
+ test("ServerSplitsPartitioner.doPartitions(): n=1, 4 empty-bucket, non-continuous IDs") {
+ val map: List[(String, mutable.Set[Int])] = List(
+ "server1" -> mutable.Set(1, 3), "server2" -> mutable.Set(5,7))
+ val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
+ verifyPartitions(partitions, List(
+ (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5, 6, 7), Seq("server2"))))
+ }
+
+ test("ServerSplitsPartitioner.doPartitions(): n=2, no empty buckets, 3 servers have 1, 2, and 3 buckets") {
+ val map: List[(String, mutable.Set[Int])] = List(
+ "s1" -> mutable.Set(0), "s2" -> mutable.Set(1, 2), "s3" -> mutable.Set(3, 4, 5))
+ val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 2)
+ // partitions.foreach(println)
+ verifyPartitions(partitions, List(
+ (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3"))))
+ }
+
+ test("ServerSplitsPartitioner.doPartitions(): n=3, no empty buckets, 4 servers have 0, 2, 3, and 4 buckets") {
+ val map: List[(String, mutable.Set[Int])] = List(
+ "s0" -> mutable.Set.empty, "s1" -> mutable.Set(0, 1), "s2" -> mutable.Set(2, 3, 4), "s3" -> mutable.Set(5, 6, 7, 8))
+ val partitions = ServerSplitsPartitioner.doPartitions(map, 9, 3)
+ // partitions.foreach(println)
+ verifyPartitions(partitions, List(
+ (Set(0), Seq("s1")), (Set(1), Seq("s1")), (Set(2), Seq("s2")), (Set(3), Seq("s2")), (Set(4), Seq("s2")),
+ (Set(5, 6), Seq("s3")), (Set(7, 8), Seq("s3")) ))
+ }
+
+ test("ServerSplitsPartitioner.partitions(): metadata = None ") {
+ val regionPath = "test"
+ val mockConnection = mock[GeodeConnection]
+ intercept[RuntimeException] { ServerSplitsPartitioner.partitions[String, String](mockConnection, null, Map.empty) }
+ }
+
+ test("ServerSplitsPartitioner.partitions(): replicated region ") {
+ val regionPath = "test"
+ val mockConnection = mock[GeodeConnection]
+ val md = new RegionMetadata(regionPath, false, 11, null)
+ when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
+ val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty)
+ verifySinglePartition(partitions)
+ }
+
+ test("ServerSplitsPartitioner.partitions(): partitioned region w/o data ") {
+ val regionPath = "test"
+ val mockConnection = mock[GeodeConnection]
+ val md = new RegionMetadata(regionPath, true, 6, emptyServerBucketMap)
+ when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
+ val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty)
+ verifySinglePartition(partitions)
+ }
+
+ test("ServerSplitsPartitioner.partitions(): partitioned region w/ some data ") {
+ import io.pivotal.geode.spark.connector.NumberPartitionsPerServerPropKey
+ val regionPath = "test"
+ val mockConnection = mock[GeodeConnection]
+ val map: Map[(String, Int), Set[Int]] = Map(
+ ("s0",1) -> Set.empty, ("s1",2) -> Set(0), ("s2",3) -> Set(1, 2), ("s3",4) -> Set(3, 4, 5))
+ val md = new RegionMetadata(regionPath, true, 6, toJavaServerBucketMap(map))
+ when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
+ val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map(NumberPartitionsPerServerPropKey->"2"))
+ // partitions.foreach(println)
+ verifyPartitions(partitions, List(
+ (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3"))))
+ }
+
+ // Note: since the order of partitions is not pre-determined, we have to verify partition id
+ // and contents separately
+ def verifyPartitions(partitions: Array[Partition], expPartitions: List[(Set[Int], Seq[String])]): Unit = {
+ // 1. check size
+ assert(partitions.size == expPartitions.size)
+ // 2. check IDs are 0 to n-1
+ (0 until partitions.size).toList.zip(partitions).foreach { case (id, p) => assert(id == p.index) }
+
+ // 3. get all pairs of bucket set and its locations, and compare to the expected pairs
+ val list = partitions.map { e =>
+ val p = e.asInstanceOf[GeodeRDDPartition]
+ (p.bucketSet, p.locations)
+ }
+ expPartitions.foreach(e => assert(list.contains(e)))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala
new file mode 100644
index 0000000..046ceac
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector.rdd
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.internal.RegionMetadata
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDPartition, GeodeRegionRDD}
+import io.pivotal.geode.spark.connector.{GeodeConnectionConf, GeodeConnection}
+import org.apache.spark.{TaskContext, Partition, SparkContext}
+import org.mockito.Mockito._
+import org.mockito.Matchers.{eq => mockEq, any => mockAny}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSuite}
+
+import scala.reflect.ClassTag
+
+class GeodeRegionRDDTest extends FunSuite with Matchers with MockitoSugar {
+
+ /** create common mocks, not all mocks are used by all tests */
+ def createMocks[K, V](regionPath: String)(implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]])
+ : (String, Region[K,V], GeodeConnectionConf, GeodeConnection) = {
+ val mockConnection = mock[GeodeConnection]
+ val mockRegion = mock[Region[K, V]]
+ val mockConnConf = mock[GeodeConnectionConf]
+ when(mockConnConf.getConnection).thenReturn(mockConnection)
+ when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion)
+ when(mockConnConf.locators).thenReturn(Seq.empty)
+ (regionPath, mockRegion, mockConnConf, mockConnection)
+ }
+
+ test("create GeodeRDD with non-existing region") {
+ val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
+ when(mockConnConf.getConnection).thenReturn(mockConnection)
+ when(mockConnection.validateRegion[String,String](regionPath)).thenThrow(new RuntimeException)
+ val mockSparkContext = mock[SparkContext]
+ intercept[RuntimeException] { GeodeRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf) }
+ verify(mockConnConf).getConnection
+ verify(mockConnection).validateRegion[String, String](regionPath)
+ }
+
+ test("getPartitions with non-existing region") {
+ // region exists when RDD is created, but get removed before getPartitions() is invoked
+ val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
+ when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(None)
+ val mockSparkContext = mock[SparkContext]
+ intercept[RuntimeException] { GeodeRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf).getPartitions }
+ }
+
+ test("getPartitions with replicated region and not preferred env") {
+ val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
+ implicit val mockConnConf2 = mockConnConf
+ val mockSparkContext = mock[SparkContext]
+ when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null)))
+ val partitions = GeodeRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions
+ verifySinglePartition(partitions)
+ }
+
+ def verifySinglePartition(partitions: Array[Partition]): Unit = {
+ assert(1 == partitions.size)
+ assert(partitions(0).index === 0)
+ assert(partitions(0).isInstanceOf[GeodeRDDPartition])
+ assert(partitions(0).asInstanceOf[GeodeRDDPartition].bucketSet.isEmpty)
+ }
+
+ test("getPartitions with replicated region and preferred OnePartitionPartitioner") {
+ // since it's replicated region, so OnePartitionPartitioner will be used, i.e., override preferred partitioner
+ import io.pivotal.geode.spark.connector.{PreferredPartitionerPropKey, OnePartitionPartitionerName}
+ val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
+ when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null)))
+ implicit val mockConnConf2 = mockConnConf
+ val mockSparkContext = mock[SparkContext]
+ val env = Map(PreferredPartitionerPropKey -> OnePartitionPartitionerName)
+ val partitions = GeodeRegionRDD(mockSparkContext, regionPath, mockConnConf, env).partitions
+ verifySinglePartition(partitions)
+ }
+
+ test("getPartitions with partitioned region and not preferred env") {
+ val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
+ implicit val mockConnConf2 = mockConnConf
+ val mockSparkContext = mock[SparkContext]
+ when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null)))
+ val partitions = GeodeRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions
+ verifySinglePartition(partitions)
+ }
+
+ test("GeodeRDD.compute() method") {
+ val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
+ implicit val mockConnConf2 = mockConnConf
+ val mockIter = mock[Iterator[(String, String)]]
+ val partition = GeodeRDDPartition(0, Set.empty)
+ when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null)))
+ when(mockConnection.getRegionData[String, String](regionPath, None, partition)).thenReturn(mockIter)
+ val mockSparkContext = mock[SparkContext]
+ val rdd = GeodeRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf)
+ val partitions = rdd.partitions
+ assert(1 == partitions.size)
+ val mockTaskContext = mock[TaskContext]
+ rdd.compute(partitions(0), mockTaskContext)
+ verify(mockConnection).getRegionData[String, String](mockEq(regionPath), mockEq(None), mockEq(partition))
+ // verify(mockConnection).getRegionData[String, String](regionPath, Set.empty.asInstanceOf[Set[Int]], "geodeRDD 0.0")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/Emp.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/Emp.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/Emp.java
new file mode 100644
index 0000000..03e15a0
--- /dev/null
+++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/Emp.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package demo;
+
+import java.io.Serializable;
+
+/**
+ * This is a demo class used in doc/?.md
+ */
+public class Emp implements Serializable {
+
+ private int id;
+
+ private String lname;
+
+ private String fname;
+
+ private int age;
+
+ private String loc;
+
+ public Emp(int id, String lname, String fname, int age, String loc) {
+ this.id = id;
+ this.lname = lname;
+ this.fname = fname;
+ this.age = age;
+ this.loc = loc;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getLname() {
+ return lname;
+ }
+
+ public String getFname() {
+ return fname;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public String getLoc() {
+ return loc;
+ }
+
+ @Override
+ public String toString() {
+ return "Emp(" + id + ", " + lname + ", " + fname + ", " + age + ", " + loc + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Emp emp = (Emp) o;
+
+ if (age != emp.age) return false;
+ if (id != emp.id) return false;
+ if (fname != null ? !fname.equals(emp.fname) : emp.fname != null) return false;
+ if (lname != null ? !lname.equals(emp.lname) : emp.lname != null) return false;
+ if (loc != null ? !loc.equals(emp.loc) : emp.loc != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = id;
+ result = 31 * result + (lname != null ? lname.hashCode() : 0);
+ result = 31 * result + (fname != null ? fname.hashCode() : 0);
+ result = 31 * result + age;
+ result = 31 * result + (loc != null ? loc.hashCode() : 0);
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java
new file mode 100644
index 0000000..adcf072
--- /dev/null
+++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package demo;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*;
+
+
+/**
+ * This Spark application demonstrates how to get region data from Geode using Geode
+ * OQL Java API. The result is a Spark DataFrame.
+ * <p>
+ * In order to run it, you will need to start a Geode cluster, and run demo PairRDDSaveJavaDemo
+ * first to create some data in the region.
+ * <p>
+ * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
+ * should be generated under geode-spark-demos/basic-demos/target/scala-2.10/.
+ * Then run the following command to start a Spark job:
+ * <pre>
+ * <path to spark>/bin/spark-submit --master=local[2] --class demo.OQLJavaDemo \
+ * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
+ * </pre>
+ */
+public class OQLJavaDemo {
+
+ public static void main(String[] argv) {
+
+ if (argv.length != 1) {
+ System.err.printf("Usage: OQLJavaDemo <locators>\n");
+ return;
+ }
+
+ SparkConf conf = new SparkConf().setAppName("OQLJavaDemo");
+ conf.set(GeodeLocatorPropKey, argv[0]); // "192.168.1.47[10335]"
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
+ DataFrame df = javaFunctions(sqlContext).geodeOQL("select * from /str_str_region");
+ System.out.println("======= DataFrame =======\n");
+ df.show();
+ sc.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java
new file mode 100644
index 0000000..52d2a99
--- /dev/null
+++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package demo;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+import java.util.*;
+
+import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*;
+
+/**
+ * This Spark application demonstrates how to save a RDD to Geode using Geode Spark
+ * Connector with Java.
+ * <p/>
+ * In order to run it, you will need to start Geode cluster, and create the following region
+ * with GFSH:
+ * <pre>
+ * gfsh> create region --name=str_str_region --type=REPLICATE \
+ * --key-constraint=java.lang.String --value-constraint=java.lang.String
+ * </pre>
+ *
+ * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
+ * should be generated under geode-spark-demos/basic-demos/target/scala-2.10/.
+ * Then run the following command to start a Spark job:
+ * <pre>
+ * <path to spark>/bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo \
+ * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
+ * </pre>
+ *
+ * Verify the data was saved to Geode with GFSH:
+ * <pre>gfsh> query --query="select * from /str_str_region.entrySet" </pre>
+ */
+public class PairRDDSaveJavaDemo {
+
+ public static void main(String[] argv) {
+
+ if (argv.length != 1) {
+ System.err.printf("Usage: PairRDDSaveJavaDemo <locators>\n");
+ return;
+ }
+
+ SparkConf conf = new SparkConf().setAppName("PairRDDSaveJavaDemo");
+ conf.set(GeodeLocatorPropKey, argv[0]);
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ GeodeConnectionConf connConf = GeodeConnectionConf.apply(conf);
+
+ List<Tuple2<String, String>> data = new ArrayList<>();
+ data.add(new Tuple2<>("7", "seven"));
+ data.add(new Tuple2<>("8", "eight"));
+ data.add(new Tuple2<>("9", "nine"));
+
+ List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
+ data2.add(new Tuple2<>("11", "eleven"));
+ data2.add(new Tuple2<>("12", "twelve"));
+ data2.add(new Tuple2<>("13", "thirteen"));
+
+ // method 1: generate JavaPairRDD directly
+ JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(data);
+ javaFunctions(rdd1).saveToGeode("str_str_region", connConf);
+
+ // method 2: convert JavaRDD<Tuple2<K,V>> to JavaPairRDD<K, V>
+ JavaRDD<Tuple2<String, String>> rdd2 = sc.parallelize(data2);
+ javaFunctions(toJavaPairRDD(rdd2)).saveToGeode("str_str_region", connConf);
+
+ sc.stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java
new file mode 100644
index 0000000..1125de5
--- /dev/null
+++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package demo;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*;
+
+/**
+ * This Spark application demonstrates how to save a RDD to Geode using Geode Spark
+ * Connector with Java.
+ * <p/>
+ * In order to run it, you will need to start Geode cluster, and create the following region
+ * with GFSH:
+ * <pre>
+ * gfsh> create region --name=str_int_region --type=REPLICATE \
+ * --key-constraint=java.lang.String --value-constraint=java.lang.Integer
+ * </pre>
+ *
+ * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
+ * should be generated under geode-spark-demos/basic-demos/target/scala-2.10/.
+ * Then run the following command to start a Spark job:
+ * <pre>
+ * <path to spark>/bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo \
+ * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
+ * </pre>
+ *
+ * Verify the data was saved to Geode with GFSH:
+ * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre>
+ */
+public class RDDSaveJavaDemo {
+
+ public static void main(String[] argv) {
+
+ if (argv.length != 1) {
+ System.err.printf("Usage: RDDSaveJavaDemo <locators>\n");
+ return;
+ }
+
+ SparkConf conf = new SparkConf().setAppName("RDDSaveJavaDemo");
+ conf.set(GeodeLocatorPropKey, argv[0]);
+ JavaSparkContext sc = new JavaSparkContext(conf);
+
+ List<String> data = new ArrayList<String>();
+ data.add("abcdefg");
+ data.add("abcdefgh");
+ data.add("abcdefghi");
+ JavaRDD<String> rdd = sc.parallelize(data);
+
+ GeodeConnectionConf connConf = GeodeConnectionConf.apply(conf);
+
+ PairFunction<String, String, Integer> func = new PairFunction<String, String, Integer>() {
+ @Override public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, s.length());
+ }
+ };
+
+ javaFunctions(rdd).saveToGeode("str_int_region", func, connConf);
+
+ sc.stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java
new file mode 100644
index 0000000..1ce8ceb
--- /dev/null
+++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package demo;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*;
+
+/**
+ * This Spark application demonstrates how to expose a region in Geode as a RDD using Geode
+ * Spark Connector with Java.
+ * <p>
+ * In order to run it, you will need to start Geode cluster, and run demo PairRDDSaveJavaDemo
+ * first to create some data in the region.
+ * <p>
+ * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
+ * should be generated under geode-spark-demos/basic-demos/target/scala-2.10/.
+ * Then run the following command to start a Spark job:
+ * <pre>
+ * <path to spark>/bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo \
+ * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
+ * </pre>
+ */
+public class RegionToRDDJavaDemo {
+
+ public static void main(String[] argv) {
+
+ if (argv.length != 1) {
+ System.err.printf("Usage: RegionToRDDJavaDemo <locators>\n");
+ return;
+ }
+
+ SparkConf conf = new SparkConf().setAppName("RegionToRDDJavaDemo");
+ conf.set(GeodeLocatorPropKey, argv[0]);
+ JavaSparkContext sc = new JavaSparkContext(conf);
+
+ JavaPairRDD<String, String> rdd = javaFunctions(sc).geodeRegion("str_str_region");
+ System.out.println("=== geodeRegion =======\n" + rdd.collect() + "\n=========================");
+
+ sc.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala
new file mode 100644
index 0000000..810b380
--- /dev/null
+++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package demo
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import io.pivotal.geode.spark.connector.GeodeLocatorPropKey
+import io.pivotal.geode.spark.connector.streaming._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * <p><p>
+ * In order to run it, you will need to start Geode cluster, and create the following region
+ * with GFSH:
+ * <pre>
+ * gfsh> create region --name=str_int_region --type=REPLICATE \
+ * --key-constraint=java.lang.String --value-constraint=java.lang.Integer
+ * </pre>
+ *
+ * <p>To run this on your local machine, you need to first run a net cat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/spark-submit --master=local[2] --class demo.NetworkWordCount <path to>/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port`
+ *
+ * <p><p> check result that was saved to Geode with GFSH:
+ * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre>
+ */
+object NetworkWordCount {
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: NetworkWordCount <hostname> <port> <geode locator>")
+ System.exit(1)
+ }
+
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ val currentCount = values.foldLeft(0)(_ + _)
+ val previousCount = state.getOrElse(0)
+ Some(currentCount + previousCount)
+ }
+
+ // Create the context with a 1 second batch size
+ val sparkConf = new SparkConf().setAppName("NetworkWordCount").set(GeodeLocatorPropKey, args(2))
+ val ssc = new StreamingContext(sparkConf, Seconds(1))
+ ssc.checkpoint(".")
+
+ // Create a socket stream on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ // Note that no duplication in storage level only for running locally.
+ // Replication necessary in distributed scenario for fault tolerance.
+ val lines = ssc.socketTextStream(args(0), args(1).toInt)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ val runningCounts = wordCounts.updateStateByKey[Int](updateFunc)
+ // runningCounts.print()
+ runningCounts.saveToGeode("str_int_region")
+ ssc.start()
+ ssc.awaitTermination()
+ }
+
+}