You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/20 23:00:32 UTC
[03/14] incubator-geode git commit: GEODE-37 change package name from
io.pivotal.geode (for ./geode-spark-connector/src/test/scala/io/pivotal)to
org.apache.geode for(to ./geode-spark-connector/src/test/scala/org/apache)
GEODE-37 change package name from io.pivotal.geode (for ./geode-spark-connector/src/test/scala/io/pivotal)to org.apache.geode for(to ./geode-spark-connector/src/test/scala/org/apache)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/97658f04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/97658f04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/97658f04
Branch: refs/heads/develop
Commit: 97658f046e0c2f7a9f0a7eb366c07a91acb7650f
Parents: f530d3a
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Tue Sep 20 15:44:10 2016 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Sep 20 16:01:02 2016 -0700
----------------------------------------------------------------------
.../connector/GeodeFunctionDeployerTest.scala | 58 -----
.../DefaultGeodeConnectionManagerTest.scala | 82 ------
...tStreamingResultSenderAndCollectorTest.scala | 254 -------------------
.../internal/oql/QueryParserTest.scala | 83 ------
.../connector/GeodeFunctionDeployerTest.scala | 58 +++++
.../DefaultGeodeConnectionManagerTest.scala | 82 ++++++
...tStreamingResultSenderAndCollectorTest.scala | 254 +++++++++++++++++++
.../internal/oql/QueryParserTest.scala | 83 ++++++
8 files changed, 477 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
deleted file mode 100644
index 4e45dc2..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-import org.apache.commons.httpclient.HttpClient
-import java.io.File
-
-
-class GeodeFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar {
- val mockHttpClient: HttpClient = mock[HttpClient]
-
- test("jmx url creation") {
- val jmxHostAndPort = "localhost:7070"
- val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
- val gfd = new GeodeFunctionDeployer(mockHttpClient);
- val urlString = gfd.constructURLString(jmxHostAndPort)
- assert(urlString === expectedUrlString)
- }
-
- test("missing jar file") {
- val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
- val gfd = new GeodeFunctionDeployer(mockHttpClient);
- intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)}
- }
-
- test("deploy with missing jar") {
- val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
- val gfd = new GeodeFunctionDeployer(mockHttpClient);
- intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))}
- intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))}
- }
-
- test("successful mocked deploy") {
- val gfd = new GeodeFunctionDeployer(mockHttpClient);
- val jar = new File("README.md");
- assert(gfd.deploy("localhost:7070", jar).contains("Deployed"))
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
deleted file mode 100644
index 798912c..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-
-class DefaultGeodeConnectionManagerTest extends FunSuite with Matchers with MockitoSugar {
-
- test("DefaultGeodeConnectionFactory get/closeConnection") {
- // note: connConf 1-4 share the same set of locators
- val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
- val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
- val connConf3 = new GeodeConnectionConf(Seq(("host1", 1234), ("host2", 5678)))
- val connConf4 = new GeodeConnectionConf(Seq(("host2", 5678), ("host1", 1234)))
- val connConf5 = new GeodeConnectionConf(Seq(("host5", 3333)))
-
- val props: Map[String, String] = Map.empty
- val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
- val mockConn1 = mock[DefaultGeodeConnection]
- val mockConn2 = mock[DefaultGeodeConnection]
- when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1)
- when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2)
-
- assert(DefaultGeodeConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1)
- // note: following 3 lines do not trigger connFactory.newConnection(...)
- assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
- assert(DefaultGeodeConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1)
- assert(DefaultGeodeConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1)
- assert(DefaultGeodeConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2)
-
- // connFactory.newConnection(...) were invoked only twice
- verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props)
- verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props)
- assert(DefaultGeodeConnectionManager.connections.size == 3)
-
- DefaultGeodeConnectionManager.closeConnection(connConf1)
- assert(DefaultGeodeConnectionManager.connections.size == 1)
- DefaultGeodeConnectionManager.closeConnection(connConf5)
- assert(DefaultGeodeConnectionManager.connections.isEmpty)
- }
-
- test("DefaultGeodeConnectionFactory newConnection(...) throws RuntimeException") {
- val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
- val props: Map[String, String] = Map.empty
- val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
- when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException())
- intercept[RuntimeException] { DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) }
- verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props)
- }
-
- test("DefaultGeodeConnectionFactory close() w/ non-exist connection") {
- val props: Map[String, String] = Map.empty
- val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
- val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
- val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
- val mockConn1 = mock[DefaultGeodeConnection]
- when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1)
- assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
- assert(DefaultGeodeConnectionManager.connections.size == 1)
- // connection does not exists in the connection manager
- DefaultGeodeConnectionManager.closeConnection(connConf2)
- assert(DefaultGeodeConnectionManager.connections.size == 1)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
deleted file mode 100644
index c95f1dc..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions
-
-import org.apache.geode.DataSerializer
-import org.apache.geode.cache.execute.{ResultCollector, ResultSender}
-import org.apache.geode.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl}
-import org.apache.geode.cache.query.types.ObjectType
-import org.apache.geode.internal.{Version, ByteArrayDataInput, HeapDataOutputStream}
-import org.apache.geode.internal.cache.{CachedDeserializable, CachedDeserializableFactory}
-import org.scalatest.{BeforeAndAfter, FunSuite}
-import scala.collection.JavaConversions._
-import scala.concurrent.{Await, ExecutionContext, Future}
-import ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-
-class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter {
-
- /**
- * A test ResultSender that connects struct ResultSender and ResultCollector
- * Note: this test ResultSender has to copy the data (byte array) since the
- * StructStreamingResultSender will reuse the byte array.
- */
- class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] {
-
- var finishedNum = 0
-
- override def sendResult(result: Object): Unit =
- collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
-
- /** exception should be sent via lastResult() */
- override def sendException(throwable: Throwable): Unit =
- throw new UnsupportedOperationException("sendException is not supported.")
-
- override def lastResult(result: Object): Unit = {
- collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
- this.synchronized {
- finishedNum += 1
- if (finishedNum == num)
- collector.endResults()
- }
- }
- }
-
- /** common variables */
- var collector: StructStreamingResultCollector = _
- var baseSender: LocalResultSender = _
- /** common types */
- val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType]
- val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType))
- val OneColType = new StructTypeImpl(Array("value"), Array(objType))
-
- before {
- collector = new StructStreamingResultCollector
- baseSender = new LocalResultSender(collector, 1)
- }
-
- test("transfer simple data") {
- verifySimpleTransfer(sendDataType = true)
- }
-
- test("transfer simple data with no type info") {
- verifySimpleTransfer(sendDataType = false)
- }
-
- def verifySimpleTransfer(sendDataType: Boolean): Unit = {
- val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator
- val dataType = if (sendDataType) TwoColType else null
- new StructStreamingResultSender(baseSender, dataType , iter).send()
- // println("type: " + collector.getResultType.toString)
- assert(TwoColType.equals(collector.getResultType))
- val iter2 = collector.getResult
- (0 to 9).foreach { i =>
- assert(iter2.hasNext)
- val o = iter2.next()
- assert(o.size == 2)
- assert(o(0).asInstanceOf[Int] == i)
- assert(o(1).asInstanceOf[String] == i.toString * 5)
- }
- assert(! iter2.hasNext)
- }
-
-
- /**
- * A test iterator that generate integer data
- * @param start the 1st value
- * @param n number of integers generated
- * @param genExcp generate Exception if true. This is used to test exception handling.
- */
- def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = {
- new Iterator[Array[Object]] {
- val max = if (genExcp) start + n else start + n - 1
- var index: Int = start - 1
-
- override def hasNext: Boolean = if (index < max) true else false
-
- override def next(): Array[Object] =
- if (index < (start + n - 1)) {
- index += 1
- Array(index.asInstanceOf[Object])
- } else throw new RuntimeException("simulated error")
- }
- }
-
- test("transfer data with 0 row") {
- new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send()
- // println("type: " + collector.getResultType.toString)
- assert(collector.getResultType == null)
- val iter = collector.getResult
- assert(! iter.hasNext)
- }
-
- test("transfer data with 10K rows") {
- new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send()
- // println("type: " + collector.getResultType.toString)
- assert(OneColType.equals(collector.getResultType))
- val iter = collector.getResult
- // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
- (1 to 10000).foreach { i =>
- assert(iter.hasNext)
- val o = iter.next()
- assert(o.size == 1)
- assert(o(0).asInstanceOf[Int] == i)
- }
- assert(! iter.hasNext)
- }
-
- test("transfer data with 10K rows with 2 sender") {
- baseSender = new LocalResultSender(collector, 2)
- val total = 300
- val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
- val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()}
- Await.result(sender1, 1.seconds)
- Await.result(sender2, 1.seconds)
-
- // println("type: " + collector.getResultType.toString)
- assert(OneColType.equals(collector.getResultType))
- val iter = collector.getResult
- // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
- val set = scala.collection.mutable.Set[Int]()
- (1 to total).foreach { i =>
- assert(iter.hasNext)
- val o = iter.next()
- assert(o.size == 1)
- assert(! set.contains(o(0).asInstanceOf[Int]))
- set.add(o(0).asInstanceOf[Int])
- }
- assert(! iter.hasNext)
- }
-
- test("transfer data with 10K rows with 2 sender with error") {
- baseSender = new LocalResultSender(collector, 2)
- val total = 1000
- val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
- val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()}
- Await.result(sender1, 1 seconds)
- Await.result(sender2, 1 seconds)
-
- // println("type: " + collector.getResultType.toString)
- assert(OneColType.equals(collector.getResultType))
- val iter = collector.getResult
- // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
- val set = scala.collection.mutable.Set[Int]()
- intercept[RuntimeException] {
- (1 to total).foreach { i =>
- assert(iter.hasNext)
- val o = iter.next()
- assert(o.size == 1)
- assert(! set.contains(o(0).asInstanceOf[Int]))
- set.add(o(0).asInstanceOf[Int])
- }
- }
- // println(s"rows received: ${set.size}")
- }
-
- test("transfer data with Exception") {
- new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send()
- // println("type: " + collector.getResultType.toString)
- val iter = collector.getResult
- intercept[RuntimeException] ( iter.foreach(_.mkString(",")) )
- }
-
- def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] =
- intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}"))
-
- test("transfer string pair data with 200 rows") {
- new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send()
- // println("type: " + collector.getResultType.toString)
- assert(TwoColType.equals(collector.getResultType))
- val iter = collector.getResult
- // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
- (1 to 1000).foreach { i =>
- assert(iter.hasNext)
- val o = iter.next()
- assert(o.size == 2)
- assert(o(0) == s"key-$i")
- assert(o(1) == s"value-$i")
- }
- assert(! iter.hasNext)
- }
-
- /**
- * Usage notes: There are 3 kinds of data to transfer:
- * (1) object, (2) byte array of serialized object, and (3) byte array
- * this test shows how to handle all of them.
- */
- test("DataSerializer usage") {
- val outBuf = new HeapDataOutputStream(1024, null)
- val inBuf = new ByteArrayDataInput()
-
- // 1. a regular object
- val hello = "Hello World!" * 30
- // serialize the data
- DataSerializer.writeObject(hello, outBuf)
- val bytesHello = outBuf.toByteArray.clone()
- // de-serialize the data
- inBuf.initialize(bytesHello, Version.CURRENT)
- val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
- assert(hello == hello2)
-
- // 2. byte array of serialized object
- // serialize: byte array from `CachedDeserializable`
- val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello)
- outBuf.reset()
- DataSerializer.writeByteArray(cd.getSerializedValue, outBuf)
- // de-serialize the data in 2 steps
- inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
- val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf)
- inBuf.initialize(bytesHello2, Version.CURRENT)
- val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
- assert(hello == hello3)
-
- // 3. byte array
- outBuf.reset()
- DataSerializer.writeByteArray(bytesHello, outBuf)
- inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
- val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf)
- assert(bytesHello sameElements bytesHello3)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
deleted file mode 100644
index 54394e8..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import org.scalatest.FunSuite
-
-class QueryParserTest extends FunSuite {
-
- test("select * from /r1") {
- val r = QueryParser.parseOQL("select * from /r1").get
- assert(r == "List(/r1)")
- }
-
- test("select c2 from /r1") {
- val r = QueryParser.parseOQL("select c2 from /r1").get
- assert(r == "List(/r1)")
- }
-
- test("select key, value from /r1.entries") {
- val r = QueryParser.parseOQL("select key, value from /r1.entries").get
- assert(r == "List(/r1.entries)")
- }
-
- test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") {
- val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get
- assert(r == "List(/r1)")
- }
-
- test("select * from /r1/r2 where c1 >= 200") {
- val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get
- assert(r == "List(/r1/r2)")
- }
-
- test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") {
- val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get
- assert(r == "List(/r1/r2, /r3/r4)")
- }
-
- test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") {
- val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get
- assert(r == "List(/r1/r2)")
- }
-
- test("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") {
- val r = QueryParser.parseOQL("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get
- assert(r == "List(/root/sub.entries)")
- }
-
- test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") {
- val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get
- assert(r == "List(/region)")
- }
-
- test("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID") {
- val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID").get
- assert(r == "List(/QueryRegion1, /QueryRegion2)")
- }
-
- test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") {
- val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get
- println("r.type=" + r.getClass.getName + " r=" + r)
- assert(r == "List(/obj_obj_region)")
- }
-
- test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") {
- val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get
- assert(r == "List(/obj_obj_region, r.positions.values)")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala
new file mode 100644
index 0000000..4e45dc2
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.commons.httpclient.HttpClient
+import java.io.File
+
+
+class GeodeFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar {
+ val mockHttpClient: HttpClient = mock[HttpClient]
+
+ test("jmx url creation") {
+ val jmxHostAndPort = "localhost:7070"
+ val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
+ val gfd = new GeodeFunctionDeployer(mockHttpClient);
+ val urlString = gfd.constructURLString(jmxHostAndPort)
+ assert(urlString === expectedUrlString)
+ }
+
+ test("missing jar file") {
+ val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
+ val gfd = new GeodeFunctionDeployer(mockHttpClient);
+ intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)}
+ }
+
+ test("deploy with missing jar") {
+ val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
+ val gfd = new GeodeFunctionDeployer(mockHttpClient);
+ intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))}
+ intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))}
+ }
+
+ test("successful mocked deploy") {
+ val gfd = new GeodeFunctionDeployer(mockHttpClient);
+ val jar = new File("README.md");
+ assert(gfd.deploy("localhost:7070", jar).contains("Deployed"))
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
new file mode 100644
index 0000000..798912c
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+
+class DefaultGeodeConnectionManagerTest extends FunSuite with Matchers with MockitoSugar {
+
+ test("DefaultGeodeConnectionFactory get/closeConnection") {
+ // note: connConf 1-4 share the same set of locators
+ val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
+ val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
+ val connConf3 = new GeodeConnectionConf(Seq(("host1", 1234), ("host2", 5678)))
+ val connConf4 = new GeodeConnectionConf(Seq(("host2", 5678), ("host1", 1234)))
+ val connConf5 = new GeodeConnectionConf(Seq(("host5", 3333)))
+
+ val props: Map[String, String] = Map.empty
+ val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
+ val mockConn1 = mock[DefaultGeodeConnection]
+ val mockConn2 = mock[DefaultGeodeConnection]
+ when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1)
+ when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2)
+
+ assert(DefaultGeodeConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1)
+ // note: following 3 lines do not trigger connFactory.newConnection(...)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2)
+
+ // connFactory.newConnection(...) were invoked only twice
+ verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props)
+ verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props)
+ assert(DefaultGeodeConnectionManager.connections.size == 3)
+
+ DefaultGeodeConnectionManager.closeConnection(connConf1)
+ assert(DefaultGeodeConnectionManager.connections.size == 1)
+ DefaultGeodeConnectionManager.closeConnection(connConf5)
+ assert(DefaultGeodeConnectionManager.connections.isEmpty)
+ }
+
+ test("DefaultGeodeConnectionFactory newConnection(...) throws RuntimeException") {
+ val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
+ val props: Map[String, String] = Map.empty
+ val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
+ when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException())
+ intercept[RuntimeException] { DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) }
+ verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props)
+ }
+
+ test("DefaultGeodeConnectionFactory close() w/ non-exist connection") {
+ val props: Map[String, String] = Map.empty
+ val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
+ val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
+ val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
+ val mockConn1 = mock[DefaultGeodeConnection]
+ when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1)
+ assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
+ assert(DefaultGeodeConnectionManager.connections.size == 1)
+ // connection does not exists in the connection manager
+ DefaultGeodeConnectionManager.closeConnection(connConf2)
+ assert(DefaultGeodeConnectionManager.connections.size == 1)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
new file mode 100644
index 0000000..c95f1dc
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions
+
+import org.apache.geode.DataSerializer
+import org.apache.geode.cache.execute.{ResultCollector, ResultSender}
+import org.apache.geode.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl}
+import org.apache.geode.cache.query.types.ObjectType
+import org.apache.geode.internal.{Version, ByteArrayDataInput, HeapDataOutputStream}
+import org.apache.geode.internal.cache.{CachedDeserializable, CachedDeserializableFactory}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import scala.collection.JavaConversions._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter {
+
+ /**
+ * A test ResultSender that connects struct ResultSender and ResultCollector
+ * Note: this test ResultSender has to copy the data (byte array) since the
+ * StructStreamingResultSender will reuse the byte array.
+ */
+ class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] {
+
+ var finishedNum = 0
+
+ override def sendResult(result: Object): Unit =
+ collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
+
+ /** exception should be sent via lastResult() */
+ override def sendException(throwable: Throwable): Unit =
+ throw new UnsupportedOperationException("sendException is not supported.")
+
+ override def lastResult(result: Object): Unit = {
+ collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
+ this.synchronized {
+ finishedNum += 1
+ if (finishedNum == num)
+ collector.endResults()
+ }
+ }
+ }
+
+ /** common variables */
+ var collector: StructStreamingResultCollector = _
+ var baseSender: LocalResultSender = _
+ /** common types */
+ val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType]
+ val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType))
+ val OneColType = new StructTypeImpl(Array("value"), Array(objType))
+
+ before {
+ collector = new StructStreamingResultCollector
+ baseSender = new LocalResultSender(collector, 1)
+ }
+
+ test("transfer simple data") {
+ verifySimpleTransfer(sendDataType = true)
+ }
+
+ test("transfer simple data with no type info") {
+ verifySimpleTransfer(sendDataType = false)
+ }
+
+ def verifySimpleTransfer(sendDataType: Boolean): Unit = {
+ val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator
+ val dataType = if (sendDataType) TwoColType else null
+ new StructStreamingResultSender(baseSender, dataType , iter).send()
+ // println("type: " + collector.getResultType.toString)
+ assert(TwoColType.equals(collector.getResultType))
+ val iter2 = collector.getResult
+ (0 to 9).foreach { i =>
+ assert(iter2.hasNext)
+ val o = iter2.next()
+ assert(o.size == 2)
+ assert(o(0).asInstanceOf[Int] == i)
+ assert(o(1).asInstanceOf[String] == i.toString * 5)
+ }
+ assert(! iter2.hasNext)
+ }
+
+
+ /**
+ * A test iterator that generate integer data
+ * @param start the 1st value
+ * @param n number of integers generated
+ * @param genExcp generate Exception if true. This is used to test exception handling.
+ */
+ def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = {
+ new Iterator[Array[Object]] {
+ val max = if (genExcp) start + n else start + n - 1
+ var index: Int = start - 1
+
+ override def hasNext: Boolean = if (index < max) true else false
+
+ override def next(): Array[Object] =
+ if (index < (start + n - 1)) {
+ index += 1
+ Array(index.asInstanceOf[Object])
+ } else throw new RuntimeException("simulated error")
+ }
+ }
+
+ test("transfer data with 0 row") {
+ new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send()
+ // println("type: " + collector.getResultType.toString)
+ assert(collector.getResultType == null)
+ val iter = collector.getResult
+ assert(! iter.hasNext)
+ }
+
+ test("transfer data with 10K rows") {
+ new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send()
+ // println("type: " + collector.getResultType.toString)
+ assert(OneColType.equals(collector.getResultType))
+ val iter = collector.getResult
+ // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+ (1 to 10000).foreach { i =>
+ assert(iter.hasNext)
+ val o = iter.next()
+ assert(o.size == 1)
+ assert(o(0).asInstanceOf[Int] == i)
+ }
+ assert(! iter.hasNext)
+ }
+
+ test("transfer data with 10K rows with 2 sender") {
+ baseSender = new LocalResultSender(collector, 2)
+ val total = 300
+ val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
+ val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()}
+ Await.result(sender1, 1.seconds)
+ Await.result(sender2, 1.seconds)
+
+ // println("type: " + collector.getResultType.toString)
+ assert(OneColType.equals(collector.getResultType))
+ val iter = collector.getResult
+ // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+ val set = scala.collection.mutable.Set[Int]()
+ (1 to total).foreach { i =>
+ assert(iter.hasNext)
+ val o = iter.next()
+ assert(o.size == 1)
+ assert(! set.contains(o(0).asInstanceOf[Int]))
+ set.add(o(0).asInstanceOf[Int])
+ }
+ assert(! iter.hasNext)
+ }
+
+ test("transfer data with 10K rows with 2 sender with error") {
+ baseSender = new LocalResultSender(collector, 2)
+ val total = 1000
+ val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
+ val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()}
+ Await.result(sender1, 1 seconds)
+ Await.result(sender2, 1 seconds)
+
+ // println("type: " + collector.getResultType.toString)
+ assert(OneColType.equals(collector.getResultType))
+ val iter = collector.getResult
+ // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+ val set = scala.collection.mutable.Set[Int]()
+ intercept[RuntimeException] {
+ (1 to total).foreach { i =>
+ assert(iter.hasNext)
+ val o = iter.next()
+ assert(o.size == 1)
+ assert(! set.contains(o(0).asInstanceOf[Int]))
+ set.add(o(0).asInstanceOf[Int])
+ }
+ }
+ // println(s"rows received: ${set.size}")
+ }
+
+ test("transfer data with Exception") {
+ new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send()
+ // println("type: " + collector.getResultType.toString)
+ val iter = collector.getResult
+ intercept[RuntimeException] ( iter.foreach(_.mkString(",")) )
+ }
+
+ def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] =
+ intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}"))
+
+ test("transfer string pair data with 200 rows") {
+ new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send()
+ // println("type: " + collector.getResultType.toString)
+ assert(TwoColType.equals(collector.getResultType))
+ val iter = collector.getResult
+ // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+ (1 to 1000).foreach { i =>
+ assert(iter.hasNext)
+ val o = iter.next()
+ assert(o.size == 2)
+ assert(o(0) == s"key-$i")
+ assert(o(1) == s"value-$i")
+ }
+ assert(! iter.hasNext)
+ }
+
+ /**
+ * Usage notes: There are 3 kinds of data to transfer:
+ * (1) object, (2) byte array of serialized object, and (3) byte array
+ * this test shows how to handle all of them.
+ */
+ test("DataSerializer usage") {
+ val outBuf = new HeapDataOutputStream(1024, null)
+ val inBuf = new ByteArrayDataInput()
+
+ // 1. a regular object
+ val hello = "Hello World!" * 30
+ // serialize the data
+ DataSerializer.writeObject(hello, outBuf)
+ val bytesHello = outBuf.toByteArray.clone()
+ // de-serialize the data
+ inBuf.initialize(bytesHello, Version.CURRENT)
+ val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
+ assert(hello == hello2)
+
+ // 2. byte array of serialized object
+ // serialize: byte array from `CachedDeserializable`
+ val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello)
+ outBuf.reset()
+ DataSerializer.writeByteArray(cd.getSerializedValue, outBuf)
+ // de-serialize the data in 2 steps
+ inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
+ val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf)
+ inBuf.initialize(bytesHello2, Version.CURRENT)
+ val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
+ assert(hello == hello3)
+
+ // 3. byte array
+ outBuf.reset()
+ DataSerializer.writeByteArray(bytesHello, outBuf)
+ inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
+ val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf)
+ assert(bytesHello sameElements bytesHello3)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala
new file mode 100644
index 0000000..54394e8
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import org.scalatest.FunSuite
+
+class QueryParserTest extends FunSuite {
+
+ test("select * from /r1") {
+ val r = QueryParser.parseOQL("select * from /r1").get
+ assert(r == "List(/r1)")
+ }
+
+ test("select c2 from /r1") {
+ val r = QueryParser.parseOQL("select c2 from /r1").get
+ assert(r == "List(/r1)")
+ }
+
+ test("select key, value from /r1.entries") {
+ val r = QueryParser.parseOQL("select key, value from /r1.entries").get
+ assert(r == "List(/r1.entries)")
+ }
+
+ test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") {
+ val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get
+ assert(r == "List(/r1)")
+ }
+
+ test("select * from /r1/r2 where c1 >= 200") {
+ val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get
+ assert(r == "List(/r1/r2)")
+ }
+
+ test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") {
+ val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get
+ assert(r == "List(/r1/r2, /r3/r4)")
+ }
+
+ test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") {
+ val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get
+ assert(r == "List(/r1/r2)")
+ }
+
+ test("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") {
+ val r = QueryParser.parseOQL("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get
+ assert(r == "List(/root/sub.entries)")
+ }
+
+ test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") {
+ val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get
+ assert(r == "List(/region)")
+ }
+
+ test("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID") {
+ val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID").get
+ assert(r == "List(/QueryRegion1, /QueryRegion2)")
+ }
+
+ test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") {
+ val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get
+ println("r.type=" + r.getClass.getName + " r=" + r)
+ assert(r == "List(/obj_obj_region)")
+ }
+
+ test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") {
+ val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get
+ assert(r == "List(/obj_obj_region, r.positions.values)")
+ }
+}