You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/20 23:00:32 UTC

[03/14] incubator-geode git commit: GEODE-37 change package name from io.pivotal.geode (for ./geode-spark-connector/src/test/scala/io/pivotal)to org.apache.geode for(to ./geode-spark-connector/src/test/scala/org/apache)

GEODE-37 change package name from io.pivotal.geode (for ./geode-spark-connector/src/test/scala/io/pivotal)to org.apache.geode for(to ./geode-spark-connector/src/test/scala/org/apache)


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/97658f04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/97658f04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/97658f04

Branch: refs/heads/develop
Commit: 97658f046e0c2f7a9f0a7eb366c07a91acb7650f
Parents: f530d3a
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Tue Sep 20 15:44:10 2016 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Sep 20 16:01:02 2016 -0700

----------------------------------------------------------------------
 .../connector/GeodeFunctionDeployerTest.scala   |  58 -----
 .../DefaultGeodeConnectionManagerTest.scala     |  82 ------
 ...tStreamingResultSenderAndCollectorTest.scala | 254 -------------------
 .../internal/oql/QueryParserTest.scala          |  83 ------
 .../connector/GeodeFunctionDeployerTest.scala   |  58 +++++
 .../DefaultGeodeConnectionManagerTest.scala     |  82 ++++++
 ...tStreamingResultSenderAndCollectorTest.scala | 254 +++++++++++++++++++
 .../internal/oql/QueryParserTest.scala          |  83 ++++++
 8 files changed, 477 insertions(+), 477 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
deleted file mode 100644
index 4e45dc2..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-import org.apache.commons.httpclient.HttpClient
-import java.io.File
-
-
-class GeodeFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar {
-  val mockHttpClient: HttpClient = mock[HttpClient]
-    
-  test("jmx url creation") {
-    val jmxHostAndPort = "localhost:7070"
-    val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
-    val gfd = new GeodeFunctionDeployer(mockHttpClient);
-    val urlString = gfd.constructURLString(jmxHostAndPort)
-    assert(urlString === expectedUrlString)
-  }
-    
-  test("missing jar file") {
-    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
-    val gfd = new GeodeFunctionDeployer(mockHttpClient);
-    intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)}
-  }
-  
-  test("deploy with missing jar") {
-    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
-    val gfd = new GeodeFunctionDeployer(mockHttpClient);
-    intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))}
-    intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))}
-  }
-  
-  test("successful mocked deploy") {
-    val gfd = new GeodeFunctionDeployer(mockHttpClient);
-    val jar = new File("README.md");
-    assert(gfd.deploy("localhost:7070", jar).contains("Deployed"))
-  }
-  
-
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
deleted file mode 100644
index 798912c..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-
-class DefaultGeodeConnectionManagerTest extends FunSuite  with Matchers with MockitoSugar {
-
-  test("DefaultGeodeConnectionFactory get/closeConnection") {
-    // note: connConf 1-4 share the same set of locators
-    val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
-    val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
-    val connConf3 = new GeodeConnectionConf(Seq(("host1", 1234), ("host2", 5678)))
-    val connConf4 = new GeodeConnectionConf(Seq(("host2", 5678), ("host1", 1234)))
-    val connConf5 = new GeodeConnectionConf(Seq(("host5", 3333)))
-
-    val props: Map[String, String] = Map.empty
-    val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
-    val mockConn1 = mock[DefaultGeodeConnection]
-    val mockConn2 = mock[DefaultGeodeConnection]
-    when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1)
-    when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2)
-
-    assert(DefaultGeodeConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1)
-    // note: following 3 lines do not trigger connFactory.newConnection(...)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2)
-
-    // connFactory.newConnection(...) were invoked only twice
-    verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props)
-    verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props)
-    assert(DefaultGeodeConnectionManager.connections.size == 3)
-
-    DefaultGeodeConnectionManager.closeConnection(connConf1)
-    assert(DefaultGeodeConnectionManager.connections.size == 1)
-    DefaultGeodeConnectionManager.closeConnection(connConf5)
-    assert(DefaultGeodeConnectionManager.connections.isEmpty)
-  }
-  
-  test("DefaultGeodeConnectionFactory newConnection(...) throws RuntimeException") {
-    val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
-    val props: Map[String, String] = Map.empty
-    val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
-    when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException())
-    intercept[RuntimeException] { DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) }
-    verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props)
-  }
-
-  test("DefaultGeodeConnectionFactory close() w/ non-exist connection") {
-    val props: Map[String, String] = Map.empty
-    val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
-    val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
-    val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
-    val mockConn1 = mock[DefaultGeodeConnection]
-    when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
-    assert(DefaultGeodeConnectionManager.connections.size == 1)
-    // connection does not exists in the connection manager
-    DefaultGeodeConnectionManager.closeConnection(connConf2)
-    assert(DefaultGeodeConnectionManager.connections.size == 1)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
deleted file mode 100644
index c95f1dc..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions
-
-import org.apache.geode.DataSerializer
-import org.apache.geode.cache.execute.{ResultCollector, ResultSender}
-import org.apache.geode.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl}
-import org.apache.geode.cache.query.types.ObjectType
-import org.apache.geode.internal.{Version, ByteArrayDataInput, HeapDataOutputStream}
-import org.apache.geode.internal.cache.{CachedDeserializable, CachedDeserializableFactory}
-import org.scalatest.{BeforeAndAfter, FunSuite}
-import scala.collection.JavaConversions._
-import scala.concurrent.{Await, ExecutionContext, Future}
-import ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-
-class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter  {
-
-  /** 
-    * A test ResultSender that connects struct ResultSender and ResultCollector 
-    * Note: this test ResultSender has to copy the data (byte array) since the
-    *       StructStreamingResultSender will reuse the byte array.
-    */
-  class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] {
-
-    var finishedNum = 0
-    
-    override def sendResult(result: Object): Unit =
-      collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
-
-    /** exception should be sent via lastResult() */
-    override def sendException(throwable: Throwable): Unit = 
-      throw new UnsupportedOperationException("sendException is not supported.")
-
-    override def lastResult(result: Object): Unit = {
-      collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
-      this.synchronized {
-        finishedNum += 1
-        if (finishedNum == num)
-          collector.endResults()
-      }
-    }
-  }
-
-  /** common variables */
-  var collector: StructStreamingResultCollector = _
-  var baseSender: LocalResultSender = _
-  /** common types */
-  val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType]
-  val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType))
-  val OneColType = new StructTypeImpl(Array("value"), Array(objType))
-
-  before {
-    collector = new StructStreamingResultCollector
-    baseSender = new LocalResultSender(collector, 1)
-  }
-  
-  test("transfer simple data") {
-    verifySimpleTransfer(sendDataType = true)
-  }
-
-  test("transfer simple data with no type info") {
-    verifySimpleTransfer(sendDataType = false)
-  }
-
-  def verifySimpleTransfer(sendDataType: Boolean): Unit = {
-    val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator
-    val dataType = if (sendDataType) TwoColType else null
-    new StructStreamingResultSender(baseSender, dataType , iter).send()
-    // println("type: " + collector.getResultType.toString)
-    assert(TwoColType.equals(collector.getResultType))
-    val iter2 = collector.getResult
-    (0 to 9).foreach { i =>
-      assert(iter2.hasNext)
-      val o = iter2.next()
-      assert(o.size == 2)
-      assert(o(0).asInstanceOf[Int] == i)
-      assert(o(1).asInstanceOf[String] == i.toString * 5)
-    }
-    assert(! iter2.hasNext)
-  }
-
-  
-  /**
-   * A test iterator that generate integer data
-   * @param start the 1st value
-   * @param n number of integers generated
-   * @param genExcp generate Exception if true. This is used to test exception handling.
-   */
-  def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = {
-    new Iterator[Array[Object]] {
-      val max = if (genExcp) start + n else start + n - 1
-      var index: Int = start - 1
-
-      override def hasNext: Boolean = if (index < max) true else false
-
-      override def next(): Array[Object] =
-        if (index < (start + n - 1)) {
-          index += 1
-          Array(index.asInstanceOf[Object])
-        } else throw new RuntimeException("simulated error")
-    }
-  }
-
-  test("transfer data with 0 row") {
-    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send()
-    // println("type: " + collector.getResultType.toString)
-    assert(collector.getResultType == null)
-    val iter = collector.getResult
-    assert(! iter.hasNext)
-  }
-
-  test("transfer data with 10K rows") {
-    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send()
-    // println("type: " + collector.getResultType.toString)
-    assert(OneColType.equals(collector.getResultType))
-    val iter = collector.getResult
-    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
-    (1 to 10000).foreach { i =>
-      assert(iter.hasNext)
-      val o = iter.next()
-      assert(o.size == 1)
-      assert(o(0).asInstanceOf[Int] == i)
-    }
-    assert(! iter.hasNext)
-  }
-
-  test("transfer data with 10K rows with 2 sender") {
-    baseSender = new LocalResultSender(collector, 2)
-    val total = 300
-    val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
-    val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()}
-    Await.result(sender1, 1.seconds)
-    Await.result(sender2, 1.seconds)
-
-    // println("type: " + collector.getResultType.toString)
-    assert(OneColType.equals(collector.getResultType))
-    val iter = collector.getResult
-    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
-    val set = scala.collection.mutable.Set[Int]()
-    (1 to total).foreach { i =>
-      assert(iter.hasNext)
-      val o = iter.next()
-      assert(o.size == 1)
-      assert(! set.contains(o(0).asInstanceOf[Int]))
-      set.add(o(0).asInstanceOf[Int])
-    }
-    assert(! iter.hasNext)
-  }
-
-  test("transfer data with 10K rows with 2 sender with error") {
-    baseSender = new LocalResultSender(collector, 2)
-    val total = 1000
-    val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
-    val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()}
-    Await.result(sender1, 1 seconds)
-    Await.result(sender2, 1 seconds)
-
-    // println("type: " + collector.getResultType.toString)
-    assert(OneColType.equals(collector.getResultType))
-    val iter = collector.getResult
-    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
-    val set = scala.collection.mutable.Set[Int]()
-    intercept[RuntimeException] {
-      (1 to total).foreach { i =>
-        assert(iter.hasNext)
-        val o = iter.next()
-        assert(o.size == 1)
-        assert(! set.contains(o(0).asInstanceOf[Int]))
-        set.add(o(0).asInstanceOf[Int])
-      }
-    }
-    // println(s"rows received: ${set.size}")
-  }
-
-  test("transfer data with Exception") {
-    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send()
-    // println("type: " + collector.getResultType.toString)
-    val iter = collector.getResult
-    intercept[RuntimeException] ( iter.foreach(_.mkString(",")) )
-  }
-
-  def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] =
-    intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}"))
-
-  test("transfer string pair data with 200 rows") {
-    new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send()
-    // println("type: " + collector.getResultType.toString)
-    assert(TwoColType.equals(collector.getResultType))
-    val iter = collector.getResult
-    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
-    (1 to 1000).foreach { i =>
-      assert(iter.hasNext)
-      val o = iter.next()
-      assert(o.size == 2)
-      assert(o(0) == s"key-$i")
-      assert(o(1) == s"value-$i")
-    }
-    assert(! iter.hasNext)
-  }
-
-  /**
-   * Usage notes: There are 3 kinds of data to transfer:
-   * (1) object, (2) byte array of serialized object, and (3) byte array
-   * this test shows how to handle all of them.
-   */
-  test("DataSerializer usage") {
-    val outBuf = new HeapDataOutputStream(1024, null)
-    val inBuf = new ByteArrayDataInput()
-
-    // 1. a regular object
-    val hello = "Hello World!" * 30
-    // serialize the data
-    DataSerializer.writeObject(hello, outBuf)
-    val bytesHello = outBuf.toByteArray.clone()
-    // de-serialize the data
-    inBuf.initialize(bytesHello, Version.CURRENT)
-    val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
-    assert(hello == hello2)
-    
-    // 2. byte array of serialized object
-    // serialize: byte array from `CachedDeserializable`
-    val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello)
-    outBuf.reset()
-    DataSerializer.writeByteArray(cd.getSerializedValue, outBuf)
-    // de-serialize the data in 2 steps
-    inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
-    val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf)
-    inBuf.initialize(bytesHello2, Version.CURRENT)
-    val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
-    assert(hello == hello3)
-
-    // 3. byte array
-    outBuf.reset()
-    DataSerializer.writeByteArray(bytesHello, outBuf)
-    inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
-    val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf)
-    assert(bytesHello sameElements bytesHello3)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
deleted file mode 100644
index 54394e8..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import org.scalatest.FunSuite
-
-class QueryParserTest extends FunSuite {
-
-  test("select * from /r1") {
-    val r = QueryParser.parseOQL("select * from /r1").get
-    assert(r == "List(/r1)")
-  }
-
-  test("select c2 from /r1") {
-    val r = QueryParser.parseOQL("select c2 from /r1").get
-    assert(r == "List(/r1)")
-  }
-
-  test("select key, value from /r1.entries") {
-    val r = QueryParser.parseOQL("select key, value from /r1.entries").get
-    assert(r == "List(/r1.entries)")
-  }
-
-  test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") {
-    val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get
-    assert(r == "List(/r1)")
-  }
-
-  test("select * from /r1/r2 where c1 >= 200") {
-    val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get
-    assert(r == "List(/r1/r2)")
-  }
-
-  test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") {
-    val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get
-    assert(r == "List(/r1/r2, /r3/r4)")
-  }
-
-  test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") {
-    val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get
-    assert(r == "List(/r1/r2)")
-  }
-
-  test("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") {
-    val r = QueryParser.parseOQL("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get
-    assert(r == "List(/root/sub.entries)")
-  }
-
-  test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") {
-    val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get
-    assert(r == "List(/region)")
-  }
-
-  test("SELECT DISTINCT * FROM /QueryRegion1 r1,  /QueryRegion2 r2 WHERE r1.ID = r2.ID") {
-    val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1,  /QueryRegion2 r2 WHERE r1.ID = r2.ID").get
-    assert(r == "List(/QueryRegion1, /QueryRegion2)")
-  }
-
-  test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") {
-    val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get
-    println("r.type=" + r.getClass.getName + " r=" + r)
-    assert(r == "List(/obj_obj_region)")
-  }
-
-  test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") {
-    val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get
-    assert(r == "List(/obj_obj_region, r.positions.values)")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala
new file mode 100644
index 0000000..4e45dc2
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.commons.httpclient.HttpClient
+import java.io.File
+
+
+class GeodeFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar {
+  val mockHttpClient: HttpClient = mock[HttpClient]
+    
+  test("jmx url creation") {
+    val jmxHostAndPort = "localhost:7070"
+    val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
+    val gfd = new GeodeFunctionDeployer(mockHttpClient);
+    val urlString = gfd.constructURLString(jmxHostAndPort)
+    assert(urlString === expectedUrlString)
+  }
+    
+  test("missing jar file") {
+    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
+    val gfd = new GeodeFunctionDeployer(mockHttpClient);
+    intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)}
+  }
+  
+  test("deploy with missing jar") {
+    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
+    val gfd = new GeodeFunctionDeployer(mockHttpClient);
+    intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))}
+    intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))}
+  }
+  
+  test("successful mocked deploy") {
+    val gfd = new GeodeFunctionDeployer(mockHttpClient);
+    val jar = new File("README.md");
+    assert(gfd.deploy("localhost:7070", jar).contains("Deployed"))
+  }
+  
+
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
new file mode 100644
index 0000000..798912c
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+
+class DefaultGeodeConnectionManagerTest extends FunSuite  with Matchers with MockitoSugar {
+
+  test("DefaultGeodeConnectionFactory get/closeConnection") {
+    // note: connConf 1-4 share the same set of locators
+    val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
+    val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
+    val connConf3 = new GeodeConnectionConf(Seq(("host1", 1234), ("host2", 5678)))
+    val connConf4 = new GeodeConnectionConf(Seq(("host2", 5678), ("host1", 1234)))
+    val connConf5 = new GeodeConnectionConf(Seq(("host5", 3333)))
+
+    val props: Map[String, String] = Map.empty
+    val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
+    val mockConn1 = mock[DefaultGeodeConnection]
+    val mockConn2 = mock[DefaultGeodeConnection]
+    when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1)
+    when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2)
+
+    assert(DefaultGeodeConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1)
+    // note: following 3 lines do not trigger connFactory.newConnection(...)
+    assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
+    assert(DefaultGeodeConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1)
+    assert(DefaultGeodeConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1)
+    assert(DefaultGeodeConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2)
+
+    // connFactory.newConnection(...) were invoked only twice
+    verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props)
+    verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props)
+    assert(DefaultGeodeConnectionManager.connections.size == 3)
+
+    DefaultGeodeConnectionManager.closeConnection(connConf1)
+    assert(DefaultGeodeConnectionManager.connections.size == 1)
+    DefaultGeodeConnectionManager.closeConnection(connConf5)
+    assert(DefaultGeodeConnectionManager.connections.isEmpty)
+  }
+  
+  test("DefaultGeodeConnectionFactory newConnection(...) throws RuntimeException") {
+    val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
+    val props: Map[String, String] = Map.empty
+    val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
+    when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException())
+    intercept[RuntimeException] { DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) }
+    verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props)
+  }
+
+  test("DefaultGeodeConnectionFactory close() w/ non-exist connection") {
+    val props: Map[String, String] = Map.empty
+    val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
+    val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
+    val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
+    val mockConn1 = mock[DefaultGeodeConnection]
+    when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1)
+    assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
+    assert(DefaultGeodeConnectionManager.connections.size == 1)
+    // connection does not exists in the connection manager
+    DefaultGeodeConnectionManager.closeConnection(connConf2)
+    assert(DefaultGeodeConnectionManager.connections.size == 1)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
new file mode 100644
index 0000000..c95f1dc
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions
+
+import org.apache.geode.DataSerializer
+import org.apache.geode.cache.execute.{ResultCollector, ResultSender}
+import org.apache.geode.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl}
+import org.apache.geode.cache.query.types.ObjectType
+import org.apache.geode.internal.{Version, ByteArrayDataInput, HeapDataOutputStream}
+import org.apache.geode.internal.cache.{CachedDeserializable, CachedDeserializableFactory}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import scala.collection.JavaConversions._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter  {
+
+  /** 
+    * A test ResultSender that connects struct ResultSender and ResultCollector 
+    * Note: this test ResultSender has to copy the data (byte array) since the
+    *       StructStreamingResultSender will reuse the byte array.
+    */
+  class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] {
+
+    var finishedNum = 0
+    
+    override def sendResult(result: Object): Unit =
+      collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
+
+    /** exception should be sent via lastResult() */
+    override def sendException(throwable: Throwable): Unit = 
+      throw new UnsupportedOperationException("sendException is not supported.")
+
+    override def lastResult(result: Object): Unit = {
+      collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
+      this.synchronized {
+        finishedNum += 1
+        if (finishedNum == num)
+          collector.endResults()
+      }
+    }
+  }
+
+  /** common variables */
+  var collector: StructStreamingResultCollector = _
+  var baseSender: LocalResultSender = _
+  /** common types */
+  val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType]
+  val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType))
+  val OneColType = new StructTypeImpl(Array("value"), Array(objType))
+
+  before {
+    collector = new StructStreamingResultCollector
+    baseSender = new LocalResultSender(collector, 1)
+  }
+  
+  test("transfer simple data") {
+    verifySimpleTransfer(sendDataType = true)
+  }
+
+  test("transfer simple data with no type info") {
+    verifySimpleTransfer(sendDataType = false)
+  }
+
+  def verifySimpleTransfer(sendDataType: Boolean): Unit = {
+    val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator
+    val dataType = if (sendDataType) TwoColType else null
+    new StructStreamingResultSender(baseSender, dataType , iter).send()
+    // println("type: " + collector.getResultType.toString)
+    assert(TwoColType.equals(collector.getResultType))
+    val iter2 = collector.getResult
+    (0 to 9).foreach { i =>
+      assert(iter2.hasNext)
+      val o = iter2.next()
+      assert(o.size == 2)
+      assert(o(0).asInstanceOf[Int] == i)
+      assert(o(1).asInstanceOf[String] == i.toString * 5)
+    }
+    assert(! iter2.hasNext)
+  }
+
+  
+  /**
+   * A test iterator that generate integer data
+   * @param start the 1st value
+   * @param n number of integers generated
+   * @param genExcp generate Exception if true. This is used to test exception handling.
+   */
+  def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = {
+    new Iterator[Array[Object]] {
+      val max = if (genExcp) start + n else start + n - 1
+      var index: Int = start - 1
+
+      override def hasNext: Boolean = if (index < max) true else false
+
+      override def next(): Array[Object] =
+        if (index < (start + n - 1)) {
+          index += 1
+          Array(index.asInstanceOf[Object])
+        } else throw new RuntimeException("simulated error")
+    }
+  }
+
+  test("transfer data with 0 row") {
+    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send()
+    // println("type: " + collector.getResultType.toString)
+    assert(collector.getResultType == null)
+    val iter = collector.getResult
+    assert(! iter.hasNext)
+  }
+
+  test("transfer data with 10K rows") {
+    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send()
+    // println("type: " + collector.getResultType.toString)
+    assert(OneColType.equals(collector.getResultType))
+    val iter = collector.getResult
+    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+    (1 to 10000).foreach { i =>
+      assert(iter.hasNext)
+      val o = iter.next()
+      assert(o.size == 1)
+      assert(o(0).asInstanceOf[Int] == i)
+    }
+    assert(! iter.hasNext)
+  }
+
+  test("transfer data with 10K rows with 2 sender") {
+    baseSender = new LocalResultSender(collector, 2)
+    val total = 300
+    val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
+    val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()}
+    Await.result(sender1, 1.seconds)
+    Await.result(sender2, 1.seconds)
+
+    // println("type: " + collector.getResultType.toString)
+    assert(OneColType.equals(collector.getResultType))
+    val iter = collector.getResult
+    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+    val set = scala.collection.mutable.Set[Int]()
+    (1 to total).foreach { i =>
+      assert(iter.hasNext)
+      val o = iter.next()
+      assert(o.size == 1)
+      assert(! set.contains(o(0).asInstanceOf[Int]))
+      set.add(o(0).asInstanceOf[Int])
+    }
+    assert(! iter.hasNext)
+  }
+
+  test("transfer data with 10K rows with 2 sender with error") {
+    baseSender = new LocalResultSender(collector, 2)
+    val total = 1000
+    val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
+    val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()}
+    Await.result(sender1, 1 seconds)
+    Await.result(sender2, 1 seconds)
+
+    // println("type: " + collector.getResultType.toString)
+    assert(OneColType.equals(collector.getResultType))
+    val iter = collector.getResult
+    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+    val set = scala.collection.mutable.Set[Int]()
+    intercept[RuntimeException] {
+      (1 to total).foreach { i =>
+        assert(iter.hasNext)
+        val o = iter.next()
+        assert(o.size == 1)
+        assert(! set.contains(o(0).asInstanceOf[Int]))
+        set.add(o(0).asInstanceOf[Int])
+      }
+    }
+    // println(s"rows received: ${set.size}")
+  }
+
+  test("transfer data with Exception") {
+    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send()
+    // println("type: " + collector.getResultType.toString)
+    val iter = collector.getResult
+    intercept[RuntimeException] ( iter.foreach(_.mkString(",")) )
+  }
+
+  def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] =
+    intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}"))
+
+  test("transfer string pair data with 200 rows") {
+    new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send()
+    // println("type: " + collector.getResultType.toString)
+    assert(TwoColType.equals(collector.getResultType))
+    val iter = collector.getResult
+    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+    (1 to 1000).foreach { i =>
+      assert(iter.hasNext)
+      val o = iter.next()
+      assert(o.size == 2)
+      assert(o(0) == s"key-$i")
+      assert(o(1) == s"value-$i")
+    }
+    assert(! iter.hasNext)
+  }
+
+  /**
+   * Usage notes: There are 3 kinds of data to transfer:
+   * (1) object, (2) byte array of serialized object, and (3) byte array
+   * this test shows how to handle all of them.
+   */
+  test("DataSerializer usage") {
+    val outBuf = new HeapDataOutputStream(1024, null)
+    val inBuf = new ByteArrayDataInput()
+
+    // 1. a regular object
+    val hello = "Hello World!" * 30
+    // serialize the data
+    DataSerializer.writeObject(hello, outBuf)
+    val bytesHello = outBuf.toByteArray.clone()
+    // de-serialize the data
+    inBuf.initialize(bytesHello, Version.CURRENT)
+    val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
+    assert(hello == hello2)
+    
+    // 2. byte array of serialized object
+    // serialize: byte array from `CachedDeserializable`
+    val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello)
+    outBuf.reset()
+    DataSerializer.writeByteArray(cd.getSerializedValue, outBuf)
+    // de-serialize the data in 2 steps
+    inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
+    val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf)
+    inBuf.initialize(bytesHello2, Version.CURRENT)
+    val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
+    assert(hello == hello3)
+
+    // 3. byte array
+    outBuf.reset()
+    DataSerializer.writeByteArray(bytesHello, outBuf)
+    inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
+    val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf)
+    assert(bytesHello sameElements bytesHello3)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala
new file mode 100644
index 0000000..54394e8
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import org.scalatest.FunSuite
+
+class QueryParserTest extends FunSuite {
+
+  test("select * from /r1") {
+    val r = QueryParser.parseOQL("select * from /r1").get
+    assert(r == "List(/r1)")
+  }
+
+  test("select c2 from /r1") {
+    val r = QueryParser.parseOQL("select c2 from /r1").get
+    assert(r == "List(/r1)")
+  }
+
+  test("select key, value from /r1.entries") {
+    val r = QueryParser.parseOQL("select key, value from /r1.entries").get
+    assert(r == "List(/r1.entries)")
+  }
+
+  test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") {
+    val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get
+    assert(r == "List(/r1)")
+  }
+
+  test("select * from /r1/r2 where c1 >= 200") {
+    val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get
+    assert(r == "List(/r1/r2)")
+  }
+
+  test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") {
+    val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get
+    assert(r == "List(/r1/r2, /r3/r4)")
+  }
+
+  test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") {
+    val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get
+    assert(r == "List(/r1/r2)")
+  }
+
+  test("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") {
+    val r = QueryParser.parseOQL("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get
+    assert(r == "List(/root/sub.entries)")
+  }
+
+  test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") {
+    val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get
+    assert(r == "List(/region)")
+  }
+
+  test("SELECT DISTINCT * FROM /QueryRegion1 r1,  /QueryRegion2 r2 WHERE r1.ID = r2.ID") {
+    val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1,  /QueryRegion2 r2 WHERE r1.ID = r2.ID").get
+    assert(r == "List(/QueryRegion1, /QueryRegion2)")
+  }
+
+  test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") {
+    val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get
+    println("r.type=" + r.getClass.getName + " r=" + r)
+    assert(r == "List(/obj_obj_region)")
+  }
+
+  test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") {
+    val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get
+    assert(r == "List(/obj_obj_region, r.positions.values)")
+  }
+}