You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/20 23:00:34 UTC

[05/14] incubator-geode git commit: GEODE-37 change package name from io.pivotal.geode (for ./geode-spark-connector/src/it/scala/ittest/io/pivotal)to org.apache.geode for(to ./geode-spark-connector/src/it/scala/ittest/org/apache)

GEODE-37 change package name from io.pivotal.geode (for ./geode-spark-connector/src/it/scala/ittest/io/pivotal)to org.apache.geode for(to ./geode-spark-connector/src/it/scala/ittest/org/apache)


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/03e60a67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/03e60a67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/03e60a67

Branch: refs/heads/develop
Commit: 03e60a672577ffde93cfe3633ccd1d1ab7f3387e
Parents: 2d374c9
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Tue Sep 20 15:44:10 2016 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Sep 20 16:01:02 2016 -0700

----------------------------------------------------------------------
 .../spark/connector/BasicIntegrationTest.scala  | 598 -------------------
 .../RDDJoinRegionIntegrationTest.scala          | 300 ----------
 .../RetrieveRegionIntegrationTest.scala         | 253 --------
 .../pivotal/geode/spark/connector/package.scala |  29 -
 .../spark/connector/testkit/GeodeCluster.scala  |  47 --
 .../spark/connector/testkit/GeodeRunner.scala   | 148 -----
 .../geode/spark/connector/testkit/IOUtils.scala |  94 ---
 .../spark/connector/BasicIntegrationTest.scala  | 598 +++++++++++++++++++
 .../RDDJoinRegionIntegrationTest.scala          | 300 ++++++++++
 .../RetrieveRegionIntegrationTest.scala         | 253 ++++++++
 .../apache/geode/spark/connector/package.scala  |  29 +
 .../spark/connector/testkit/GeodeCluster.scala  |  47 ++
 .../spark/connector/testkit/GeodeRunner.scala   | 148 +++++
 .../geode/spark/connector/testkit/IOUtils.scala |  94 +++
 14 files changed, 1469 insertions(+), 1469 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala
deleted file mode 100644
index cb1b329..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala
+++ /dev/null
@@ -1,598 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector
-
-import java.util.Properties
-import org.apache.geode.cache.query.QueryService
-import org.apache.geode.cache.query.internal.StructImpl
-import io.pivotal.geode.spark.connector._
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector.internal.{RegionMetadata, DefaultGeodeConnectionManager}
-import io.pivotal.geode.spark.connector.internal.oql.{RDDConverter, QueryRDD}
-import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster
-import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils
-import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream}
-import org.apache.spark.{SparkContext, SparkConf}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import scala.collection.JavaConversions
-import scala.reflect.ClassTag
-
-case class Number(str: String, len: Int)
-
-class BasicIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
-
-  var sc: SparkContext = null
-
-  override def beforeAll() {
-    // start geode cluster, and spark context
-    val settings = new Properties()
-    settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml")
-    settings.setProperty("num-of-servers", "2")
-    val locatorPort = GeodeCluster.start(settings)
-
-    // start spark context in local mode
-    IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
-                            "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG")
-    val conf = new SparkConf()
-      .setAppName("BasicIntegrationTest")
-      .setMaster("local[2]")
-      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-      .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
-      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .set("spark.kryo.registrator", "io.pivotal.geode.spark.connector.GeodeKryoRegistrator")
-
-    sc = new SparkContext(conf)
-  }
-
-  override def afterAll() {
-    // stop connection, spark context, and geode cluster
-    DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
-    sc.stop()
-    GeodeCluster.stop()
-  }
-
-  //Convert Map[Object, Object] to java.util.Properties
-  private def map2Props(map: Map[Object, Object]): java.util.Properties =
-    (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
-
-  // ===========================================================
-  //       DefaultGeodeConnection functional tests
-  // ===========================================================
-
-  test("DefaultGeodeConnection.validateRegion()") {
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-
-    // normal exist-region
-    var regionPath: String = "str_str_region"
-    conn.validateRegion[String, String](regionPath)
-
-    // non-exist region
-    regionPath = "non_exist_region"
-    try {
-      conn.validateRegion[String, String](regionPath)
-      fail("validateRegion failed to catch non-exist region error")
-    } catch {
-      case e: RuntimeException => 
-        if (! e.getMessage.contains(s"The region named $regionPath was not found"))
-          fail("validateRegion gives wrong exception on non-exist region", e)
-      case e: Throwable => 
-        fail("validateRegion gives wrong exception on non-exist region", e)
-    }
-
-    // Note: currently, can't catch type mismatch error
-    conn.validateRegion[String, Integer]("str_str_region")
-  }
-
-  test("DefaultGeodeConnection.getRegionMetadata()") {
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-
-    // exist region
-    validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, false)
-    validateRegionMetadata(conn, "str_int_region", true, 113, "java.lang.String", "java.lang.Integer", false)
-    validateRegionMetadata(conn, "str_str_rep_region", false, 0, "java.lang.String", "java.lang.String", true)
-
-    // non-exist region
-    assert(! conn.getRegionMetadata("no_exist_region").isDefined)
-  }
-    
-  def validateRegionMetadata(
-    conn: GeodeConnection, regionPath: String, partitioned: Boolean, buckets: Int,
-    keyType: String, valueType: String, emptyMap: Boolean): Unit = {
-
-    val mdOption = conn.getRegionMetadata(regionPath)
-    val md = mdOption.get
-   
-    assert(md.getRegionPath == s"/$regionPath")
-    assert(md.isPartitioned == partitioned)
-    assert(md.getKeyTypeName == keyType)
-    assert(md.getValueTypeName == valueType)
-    assert(md.getTotalBuckets == buckets)
-    if (emptyMap) assert(md.getServerBucketMap == null) 
-    else assert(md.getServerBucketMap != null)
-  }
-
-  test("DefaultGeodeConnection.getRegionProxy()") {
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-
-    val region1 = conn.getRegionProxy[String, String]("str_str_region")
-    region1.put("1", "One")
-    assert(region1.get("1") == "One")
-    region1.remove("1")
-    assert(region1.get("1") == null)
-    
-    // getRegionProxy doesn't fail when region doesn't exist
-    val region2 = conn.getRegionProxy[String, String]("non_exist_region")
-    try {
-      region2.put("1", "One")
-      fail("getRegionProxy failed to catch non-exist region error")
-    } catch {
-      case e: Exception =>
-        if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region named /non_exist_region was not found")) {
-          e.printStackTrace()
-          fail("validateRegion gives wrong exception on non-exist region", e)
-        }
-    }
-  }
-  
-  // Note: DefaultGeodeConnecton.getQuery() and getRegionData() are covered by
-  //       RetrieveRegionIntegrationTest.scala and following OQL tests.
-  
-  // ===========================================================
-  //                OQL functional tests
-  // ===========================================================
-  
-  private def initRegion(regionName: String): Unit = {
-
-    //Populate some data in the region
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-
-    //This will call the implicit conversion map2Properties in connector package object, since it is Map[String, String]
-    var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", "mktValue" -> "24.42"))
-    var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", "mktValue" -> "34.29"))
-    val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> "type1", "status" -> "active",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("1", portfolio1)
-
-    position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", "mktValue" -> "12.925"))
-    position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", "mktValue" -> "21.972"))
-    val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> "type2", "status" -> "inactive",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("2", portfolio2)
-
-    position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
-    position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
-    val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> "type3", "status" -> "active",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("3", portfolio3)
-
-    position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" -> "67.356572"))
-    position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" -> "101.34"))
-    val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> "type1", "status" -> "inactive",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("4", portfolio4)
-
-    position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" -> "67.356572"))
-    position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" -> "101.34"))
-    val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> "type2", "status" -> "active",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("5", portfolio5)
-
-    position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" -> "67.356572"))
-    position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" -> "101.34"))
-    val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> "type3", "status" -> "inactive",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("6", portfolio6)
-
-    position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
-    position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
-    val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> "type4", "status" -> "active",
-      "position1" -> position1, "position2" -> position2)))
-    //Not using null, due to intermittent query failure on column containing null, likely a Spark SQL bug
-    //portfolio7.setType(null)
-    rgn.put("7", portfolio7)
-  }
-
-  private def getQueryRDD[T: ClassTag](
-    query: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)): QueryRDD[T] =
-      new QueryRDD[T](sc, query, connConf)
-
-  test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Partitioned Region") {
-    simpleQuery("obj_obj_region")
-  }
-
-  test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Replicated Region") {
-    simpleQuery("obj_obj_rep_region")
-  }
-
-  private def simpleQuery(regionName: String) {
-    //Populate some data in the region
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, String] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
-
-    //Create QueryRDD using OQL
-    val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from /$regionName")
-
-    //verify the QueryRDD
-    val oqlRS: Array[String] = OQLResult.collect()
-    oqlRS should have length 3
-    oqlRS should contain theSameElementsAs List("one", "two", "three")
-
-    //Convert QueryRDD to DataFrame
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    // this is used to implicitly convert an RDD to a DataFrame.
-    import sqlContext.implicits._
-    val dataFrame = OQLResult.map(x => Number(x, x.length)).toDF()
-    //Register dataFrame as a table of two columns of type String and Int respectively
-    dataFrame.registerTempTable("numberTable")
-
-    //Issue SQL query against the table
-    val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
-    //Verify the SQL query result, r(0) mean column 0
-    val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
-    sqlRS should have length 3
-    sqlRS should contain theSameElementsAs List("one", "two", "three")
-
-    //Convert QueryRDD to DataFrame using RDDConverter
-    val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
-    //Register dataFrame2 as a table of two columns of type String and Int respectively
-    dataFrame2.registerTempTable("numberTable2")
-
-    //Issue SQL query against the table
-    val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2")
-    //Verify the SQL query result, r(0) mean column 0
-    val sqlRS2: Array[Any] = SQLResult2.map(r => r(0)).collect()
-    sqlRS2 should have length 3
-    sqlRS2 should contain theSameElementsAs List("one", "two", "three")
-
-    //Remove the region entries, because other tests might use the same region as well
-    List("1", "2", "3").foreach(rgn.remove)
-  }
-
-  test("Run Geode OQL query and directly return DataFrame: Partitioned Region") {
-    simpleQueryDataFrame("obj_obj_region")
-  }
-
-  test("Run Geode OQL query and directly return DataFrame: Replicated Region") {
-    simpleQueryDataFrame("obj_obj_rep_region")
-  }
-
-  private def simpleQueryDataFrame(regionName: String) {
-    //Populate some data in the region
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[String, String] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"select * from /$regionName")
-    dataFrame.registerTempTable("numberTable")
-
-    //Issue SQL query against the table
-    val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
-    //Verify the SQL query result, r(0) mean column 0
-    val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
-    sqlRS should have length 3
-    sqlRS should contain theSameElementsAs List("one", "two", "three")
-
-    //Remove the region entries, because other tests might use the same region as well
-    List("1", "2", "3").foreach(rgn.remove)
-  }
-
-  test("Geode OQL query with UDT: Partitioned Region") {
-    queryUDT("obj_obj_region")
-  }
-
-  test("Geode OQL query with UDT: Replicated Region") {
-    queryUDT("obj_obj_rep_region")
-  }
-
-  private def queryUDT(regionName: String) {
-
-    //Populate some data in the region
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    val e1: Employee = new Employee("hello", 123)
-    val e2: Employee = new Employee("world", 456)
-    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
-
-    //Create QueryRDD using OQL
-    val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from /$regionName")
-
-    //verify the QueryRDD
-    val oqlRS: Array[Object] = OQLResult.collect()
-    oqlRS should have length 2
-    oqlRS.map(e => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should contain theSameElementsAs List(123, 456)
-
-    //Convert QueryRDD to DataFrame
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
-    //Convert QueryRDD to DataFrame using RDDConverter
-    val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
-    dataFrame.registerTempTable("employee")
-    val SQLResult = sqlContext.sql("SELECT * FROM employee")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.map(r => r(0)).collect()
-    sqlRS should have length 2
-    sqlRS should contain theSameElementsAs List("hello", "world")
-
-    List("1", "2").foreach(rgn.remove)
-  }
-
-  test("Geode OQL query with UDT and directly return DataFrame: Partitioned Region") {
-    queryUDTDataFrame("obj_obj_region")
-  }
-
-  test("Geode OQL query with UDT and directly return DataFrame: Replicated Region") {
-    queryUDTDataFrame("obj_obj_rep_region")
-  }
-
-  private def queryUDTDataFrame(regionName: String) {
-    //Populate some data in the region
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    val e1: Employee = new Employee("hello", 123)
-    val e2: Employee = new Employee("world", 456)
-    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"select name, age from /$regionName")
-
-    dataFrame.registerTempTable("employee")
-    val SQLResult = sqlContext.sql("SELECT * FROM employee")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.map(r => r(0)).collect()
-    sqlRS should have length 2
-    sqlRS should contain theSameElementsAs List("hello", "world")
-
-    List("1", "2").foreach(rgn.remove)
-  }
-
-  test("Geode OQL query with more complex UDT: Partitioned Region") {
-    complexUDT("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT: Replicated Region") {
-    complexUDT("obj_obj_rep_region")
-  }
-
-  private def complexUDT(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create QueryRDD using OQL
-    val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
-
-    //verify the QueryRDD
-    val oqlRS: Array[Int] = OQLResult.collect().map(r => r.asInstanceOf[Portfolio].getId)
-    oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
-
-    //Convert QueryRDD to DataFrame
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
-    //Convert QueryRDD to DataFrame using RDDConverter
-    val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
-
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
-    sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
-  }
-
-  test("Geode OQL query with more complex UDT and directly return DataFrame: Partitioned Region") {
-    complexUDTDataFrame("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT and directly return DataFrame: Replicated Region") {
-    complexUDTDataFrame("obj_obj_rep_region")
-  }
-
-  private def complexUDTDataFrame(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
-    sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
-  }
-
-  test("Geode OQL query with more complex UDT with Projection: Partitioned Region") {
-    queryComplexUDTProjection("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT with Projection: Replicated Region") {
-    queryComplexUDTProjection("obj_obj_rep_region")
-  }
-
-  private def queryComplexUDTProjection(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create QueryRDD using OQL
-    val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
-
-    //verify the QueryRDD
-    val oqlRS: Array[Int] = OQLResult.collect().map(si => si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int])
-    oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
-
-    //Convert QueryRDD to DataFrame
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
-    //Convert QueryRDD to DataFrame using RDDConverter
-    val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
-
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0))
-    sqlRS should contain theSameElementsAs List(3)
-  }
-
-  test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Partitioned Region") {
-    queryComplexUDTProjectionDataFrame("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Replicated Region") {
-    queryComplexUDTProjectionDataFrame("obj_obj_rep_region")
-  }
-
-  private def queryComplexUDTProjectionDataFrame(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0))
-    sqlRS should contain theSameElementsAs List(3)
-  }
-
-  test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Partitioned Region") {
-    queryComplexUDTNestProjectionDataFrame("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Replicated Region") {
-    queryComplexUDTNestProjectionDataFrame("obj_obj_rep_region")
-  }
-
-  private def queryComplexUDTNestProjectionDataFrame(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"""SELECT r.id, r."type", r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'""")
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0))
-    sqlRS should contain theSameElementsAs List(3)
-  }
-
-  test("Undefined instance deserialization: Partitioned Region") {
-    undefinedInstanceDeserialization("obj_obj_region")
-  }
-
-  test("Undefined instance deserialization: Replicated Region") {
-    undefinedInstanceDeserialization("obj_obj_rep_region")
-  }
-
-  private def undefinedInstanceDeserialization(regionName: String) {
-
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-
-    //Put some new data
-    rgn.put("1", "one")
-
-    //Query some non-existent columns, which should return UNDEFINED
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"SELECT col100, col200 FROM /$regionName")
-    val col1 = dataFrame.first().apply(0)
-    val col2 = dataFrame.first().apply(1)
-    assert(col1 == QueryService.UNDEFINED)
-    assert(col2 == QueryService.UNDEFINED)
-    //Verify that col1 and col2 refer to the same Undefined object
-    assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef])
-  }
-
-  test("RDD.saveToGeode") {
-    val regionName = "str_str_region"
-    // generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66))
-    val data = (1 to 6).map(_.toString).map(e=> (e, e*2))
-    val rdd = sc.parallelize(data)
-    rdd.saveToGeode(regionName)
-
-    // verify
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val region: Region[String, String] = connConf.getConnection.getRegionProxy(regionName)
-    println("region key set on server: " + region.keySetOnServer())
-    assert((1 to 6).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
-    (1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e)))
-  }
-
-  // ===========================================================
-  //        DStream.saveToGeode() functional tests
-  // ===========================================================
-
-  test("Basic DStream test") {
-    import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener}
-    import io.pivotal.geode.spark.connector.streaming._
-    import org.apache.spark.streaming.ManualClockHelper
-
-    class TestStreamListener extends StreamingListener {
-      var count = 0
-      override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = count += 1
-    }
-
-    def batchDuration = Seconds(1)
-    val ssc = new StreamingContext(sc, batchDuration)
-    val input = Seq(1 to 4, 5 to 8, 9 to 12)
-    val dstream = new TestInputDStream(ssc, input, 2)
-    dstream.saveToGeode[String, Int]("str_int_region", (e: Int) => (e.toString, e))
-    try {
-      val listener = new TestStreamListener
-      ssc.addStreamingListener(listener)
-      ssc.start()
-      ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * input.length)
-      while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50)
-    } catch {
-      case e: Exception => e.printStackTrace(); throw e
-//    } finally {
-//      ssc.stop()
-    }
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val region: Region[String, Int] = conn.getRegionProxy("str_int_region")
-
-    // verify geode region contents
-    println("region key set on server: " + region.keySetOnServer())
-    assert((1 to 12).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
-    (1 to 12).foreach(e => assert(e == region.get(e.toString)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
deleted file mode 100644
index 04d4198..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector
-
-import java.util.Properties
-
-import io.pivotal.geode.spark.connector._
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager
-import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster
-import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils
-import org.apache.spark.{SparkContext, SparkConf}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import java.util.{HashMap => JHashMap}
-
-class RDDJoinRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
-
-  var sc: SparkContext = null
-  val numServers = 3
-  val numObjects = 1000
-
-  override def beforeAll() {
-    // start geode cluster, and spark context
-    val settings = new Properties()
-    settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
-    settings.setProperty("num-of-servers", numServers.toString)
-    val locatorPort = GeodeCluster.start(settings)
-
-    // start spark context in local mode
-    IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
-      "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG")
-    val conf = new SparkConf()
-      .setAppName("RDDJoinRegionIntegrationTest")
-      .setMaster("local[2]")
-      .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
-    sc = new SparkContext(conf)
-  }
-
-  override def afterAll() {
-    // stop connection, spark context, and geode cluster
-    DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
-    sc.stop()
-    GeodeCluster.stop()
-  }
-
-//  def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
-//    assert(map1.size == map2.size)
-//    map1.foreach(e => {
-//      assert(map2.contains(e._1))
-//      assert (e._2 == map2.get(e._1).get)
-//    })
-//  }
-  
-  // -------------------------------------------------------------------------------------------- 
-  // PairRDD.joinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2]
-  // -------------------------------------------------------------------------------------------- 
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", JoinTest) {
-    verifyPairRDDJoinRegionWithSameKeyType("rr_str_int_region")
-  }
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", JoinTest) {
-    verifyPairRDDJoinRegionWithSameKeyType("pr_str_int_region")
-  }
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", JoinTest) {
-    verifyPairRDDJoinRegionWithSameKeyType("pr_r_str_int_region")
-  }
-
-  def verifyPairRDDJoinRegionWithSameKeyType(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => ("k_" + x, x*2))
-    val rdd = sc.parallelize(data)
-
-    val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, connConf)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (0 until 50).map(i => ((s"k_$i", i*2), i)).toMap
-    // matchMaps[(String, Int), Int](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // ------------------------------------------------------------------------------------------------------
-  // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2]
-  // -------------------------------------------------------------------------------------------------------
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", JoinTest) {
-    verifyPairRDDJoinRegionWithDiffKeyType("rr_str_int_region")
-  }
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", JoinTest) {
-    verifyPairRDDJoinRegionWithDiffKeyType("pr_str_int_region")
-  }
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", JoinTest) {
-    verifyPairRDDJoinRegionWithDiffKeyType("pr_r_str_int_region")
-  }
-
-  def verifyPairRDDJoinRegionWithDiffKeyType(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => (x, x*2))
-    val rdd = sc.parallelize(data)
-
-    val func :((Int, Int)) => String = pair => s"k_${pair._1}"
-
-    val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, func /*, connConf*/)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (0 until 50).map(i => ((i, i*2), i)).toMap
-    // matchMaps[(Int, Int), Int](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // ------------------------------------------------------------------------------------------------ 
-  // PairRDD.outerJoinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2]
-  // ------------------------------------------------------------------------------------------------ 
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithSameKeyType("rr_str_int_region")
-  }
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithSameKeyType("pr_str_int_region")
-  }
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithSameKeyType("pr_r_str_int_region")
-  }
-
-  def verifyPairRDDOuterJoinRegionWithSameKeyType(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => ("k_" + x, x*2))
-    val rdd = sc.parallelize(data)
-
-    val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath /*, connConf*/)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (-5 until 50).map {
-      i => if (i < 0) ((s"k_$i", i * 2), None)
-      else ((s"k_$i", i*2), Some(i))}.toMap
-    // matchMaps[(String, Int), Option[Int]](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // ------------------------------------------------------------------------------------------------------
-  // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2]
-  // -------------------------------------------------------------------------------------------------------
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithDiffKeyType("rr_str_int_region")
-  }
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_str_int_region")
-  }
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_r_str_int_region")
-  }
-
-  def verifyPairRDDOuterJoinRegionWithDiffKeyType(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => (x, x*2))
-    val rdd = sc.parallelize(data)
-
-    val func :((Int, Int)) => String = pair => s"k_${pair._1}"
-
-    val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, func, connConf)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (-5 until 50).map {
-      i => if (i < 0) ((i, i * 2), None)
-      else ((i, i*2), Some(i))}.toMap
-    // matchMaps[(Int, Int), Option[Int]](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // --------------------------------------------------------------------------------------------
-  // RDD.joinGeodeRegion[K, V](regionPath, T => K,  connConf): GeodeJoinRDD[T, K, V]
-  // --------------------------------------------------------------------------------------------
-
-  test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], replicated region", JoinTest) {
-    verifyRDDJoinRegion("rr_str_int_region")
-  }
-
-  test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned region", JoinTest) {
-    verifyRDDJoinRegion("pr_str_int_region")
-  }
-
-  test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", JoinTest) {
-    verifyRDDJoinRegion("pr_r_str_int_region")
-  }
-
-  def verifyRDDJoinRegion(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => s"k_$x")
-    val rdd = sc.parallelize(data)
-
-    val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, x => x, connConf)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (0 until 50).map(i => (s"k_$i", i)).toMap
-    // matchMaps[String, Int](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // --------------------------------------------------------------------------------------------
-  // RDD.outerJoinGeodeRegion[K, V](regionPath, T => K, connConf): GeodeJoinRDD[T, K, V]
-  // --------------------------------------------------------------------------------------------
-
-  test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], replicated region", OnlyTest) {
-    verifyRDDOuterJoinRegion("rr_str_int_region")
-  }
-
-  test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned region", OnlyTest) {
-    verifyRDDOuterJoinRegion("pr_str_int_region")
-  }
-
-  test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", OnlyTest) {
-    verifyRDDOuterJoinRegion("pr_r_str_int_region")
-  }
-
-  def verifyRDDOuterJoinRegion(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => s"k_$x")
-    val rdd = sc.parallelize(data)
-
-    val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, x => x /*, connConf */)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (-5 until 50).map {
-      i => if (i < 0) (s"k_$i", None)
-           else (s"k_$i", Some(i))}.toMap
-    // matchMaps[String, Option[Int]](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala
deleted file mode 100644
index 93e7cbf..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector
-
-import java.util.Properties
-
-import io.pivotal.geode.spark.connector._
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager
-import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster
-import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils
-import org.apache.spark.{SparkContext, SparkConf}
-import org.scalatest.{Tag, BeforeAndAfterAll, FunSuite, Matchers}
-import java.util.{HashMap => JHashMap}
-
-
-class RetrieveRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
-
-  var sc: SparkContext = null
-  val numServers = 4
-  val numObjects = 1000
-
-  override def beforeAll() {
-    // start geode cluster, and spark context
-    val settings = new Properties()
-    settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
-    settings.setProperty("num-of-servers", numServers.toString)
-    val locatorPort = GeodeCluster.start(settings)
-
-    // start spark context in local mode
-    IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
-                            "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG")
-    val conf = new SparkConf()
-      .setAppName("RetrieveRegionIntegrationTest")
-      .setMaster("local[2]")
-      .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
-    sc = new SparkContext(conf)
-  }
-
-  override def afterAll() {
-    // stop connection, spark context, and geode cluster
-    DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
-    sc.stop()
-    GeodeCluster.stop()
-  }
-  
-  def executeTest[K,V](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,V]) = {
-    //Populate some data in the region
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[K, V] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-    verifyRetrieveRegion[K,V](regionName, entriesMap)
-  }
-    
-  def verifyRetrieveRegion[K,V](regionName:String, entriesMap:java.util.Map[K,V])  = {
-    val rdd = sc.geodeRegion(regionName)
-    val collectedObjs = rdd.collect()
-    collectedObjs should have length entriesMap.size
-    import scala.collection.JavaConverters._
-    matchMaps[K,V](entriesMap.asScala.toMap, collectedObjs.toMap)
-  }
- 
-  def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
-    assert(map1.size == map2.size)
-    map1.foreach(e => {
-      assert(map2.contains(e._1))
-      assert (e._2 == map2.get(e._1).get)
-      }
-    )
-  }
-  
-  //Retrieve region for Partitioned Region where some nodes are empty (empty iterator)
-  //This test has to run first...the rest of the tests always use the same num objects
-  test("Retrieve Region for PR where some nodes are empty (Empty Iterator)") {
-    val numObjects = numServers - 1
-    val entriesMap:JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
-    executeTest[String, Int]("rr_str_int_region", numObjects, entriesMap)
-  }
-
-  //Test for retrieving from region containing string key and int value
-  def verifyRetrieveStringStringRegion(regionName:String) = {
-    val entriesMap:JHashMap[String, String] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, "value_" + i))
-    executeTest[String, String](regionName, numObjects, entriesMap)
-  }
-
-  test("Retrieve Region with replicate redundant string string") {
-    verifyRetrieveStringStringRegion("rr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned string string") {
-    verifyRetrieveStringStringRegion("pr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned redundant string string") {
-    verifyRetrieveStringStringRegion("pr_r_obj_obj_region")
-  }
-  
-
-  //Test for retrieving from region containing string key and string value
-  def verifyRetrieveStringIntRegion(regionName:String) = {
-    val entriesMap:JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
-    executeTest[String, Int](regionName, numObjects, entriesMap)
-  }
-
-  test("Retrieve Region with replicate string int region") {
-    verifyRetrieveStringIntRegion("rr_str_int_region")
-  }
-
-  test("Retrieve Region with partitioned string int region") {
-    verifyRetrieveStringIntRegion("pr_str_int_region")
-  }
-
-  test("Retrieve Region with partitioned redundant string int region") {
-    verifyRetrieveStringIntRegion("pr_r_str_int_region")
-  }
-
-  //Tests for retrieving from region containing string key and object value
-  def verifyRetrieveStringObjectRegion(regionName:String) = {
-    val entriesMap:JHashMap[String, Object] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, new Employee("ename" + i, i)))
-    executeTest[String, Object](regionName, numObjects, entriesMap)
-  }
-
-  test("Retrieve Region with replicate string obj") {
-    verifyRetrieveStringObjectRegion("rr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned string obj") {
-    verifyRetrieveStringObjectRegion("pr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned redundant string obj") {
-    verifyRetrieveStringObjectRegion("pr_r_obj_obj_region")
-  }
-
-  //Test for retrieving from region containing string key and map value
-  def verifyRetrieveStringMapRegion(regionName:String) = {
-    val entriesMap:JHashMap[String,JHashMap[String,String]] = new JHashMap()
-    (0 until numObjects).map(i => {
-      val hashMap:JHashMap[String, String] = new JHashMap()
-      hashMap.put("mapKey:" + i, "mapValue:" + i)
-      entriesMap.put("key_" + i, hashMap)
-    })
-    executeTest(regionName, numObjects, entriesMap)
-  }
-
-  test("Retrieve Region with replicate string map region") {
-    verifyRetrieveStringMapRegion("rr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned string map region") {
-    verifyRetrieveStringMapRegion("pr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned redundant string map region") {
-    verifyRetrieveStringMapRegion("pr_r_obj_obj_region")
-  }
-  
-  //Test and helpers specific for retrieving from region containing string key and byte[] value
-  def executeTestWithByteArrayValues[K](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,Array[Byte]]) = {
-    //Populate some data in the region
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[K, Array[Byte]] = conn.getRegionProxy(regionName)
-    rgn.putAll(entriesMap)
-    verifyRetrieveRegionWithByteArrayValues[K](regionName, entriesMap)
-  }
-  
-  def verifyRetrieveRegionWithByteArrayValues[K](regionName:String, entriesMap:java.util.Map[K,Array[Byte]])  = {
-    val rdd = sc.geodeRegion(regionName)
-    val collectedObjs = rdd.collect()
-    collectedObjs should have length entriesMap.size
-    import scala.collection.JavaConverters._
-    matchByteArrayMaps[K](entriesMap.asScala.toMap, collectedObjs.toMap)
-  }
-  
-  def matchByteArrayMaps[K](map1:Map[K,Array[Byte]], map2:Map[K,Array[Byte]]) = {
-    map1.foreach(e => {
-      assert(map2.contains(e._1))
-      assert (java.util.Arrays.equals(e._2, map2.get(e._1).get))
-      }
-    )
-    assert(map1.size == map2.size)
-
-  }
-  
-  def verifyRetrieveStringByteArrayRegion(regionName:String) = {
-    val entriesMap:JHashMap[String, Array[Byte]] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, Array[Byte](192.toByte, 168.toByte, 0, i.toByte)))
-    executeTestWithByteArrayValues[String](regionName, numObjects, entriesMap)
-  }
-      
-  test("Retrieve Region with replicate region string byte[] region") {
-    verifyRetrieveStringByteArrayRegion("rr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partition region string byte[] region") {
-    verifyRetrieveStringByteArrayRegion("pr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partition redundant region string byte[] region") {
-    verifyRetrieveStringByteArrayRegion("pr_r_obj_obj_region")
-  }
-
-  test("Retrieve Region with where clause on partitioned redundant region", FilterTest) {
-    verifyRetrieveRegionWithWhereClause("pr_r_str_int_region")
-  }
-
-  test("Retrieve Region with where clause on partitioned region", FilterTest) {
-    verifyRetrieveRegionWithWhereClause("pr_str_int_region")
-  }
-
-  test("Retrieve Region with where clause on replicated region", FilterTest) {
-    verifyRetrieveRegionWithWhereClause("rr_str_int_region")
-  }
-
-  def verifyRetrieveRegionWithWhereClause(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val rdd = sc.geodeRegion(regionPath).where("value.intValue() < 50")
-    val expectedMap = (0 until 50).map(i => (s"key_$i", i)).toMap
-    val collectedObjs = rdd.collect()
-    // collectedObjs should have length expectedMap.size
-    matchMaps[String, Int](expectedMap, collectedObjs.toMap)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala
deleted file mode 100644
index b8571d8..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark
-
-import org.scalatest.Tag
-
-package object connector {
-
-  object OnlyTest extends Tag("OnlyTest")
-  object FetchDataTest extends Tag("FetchDateTest")
-  object FilterTest extends Tag("FilterTest")
-  object JoinTest extends Tag("JoinTest")
-  object OuterJoinTest extends Tag("OuterJoinTest")  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala
deleted file mode 100644
index 18b2fd7..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector.testkit
-
-import java.util.Properties
-
-trait GeodeCluster {
-  def startGeodeCluster(settings: Properties): Int = {
-    println("=== GeodeCluster start()")
-    GeodeCluster.start(settings)
-  }
-}
-
-object GeodeCluster {
-  private var geode: Option[GeodeRunner] = None
-
-  def start(settings: Properties): Int = {
-    geode.map(_.stopGeodeCluster()) // Clean up any old running Geode instances
-    val runner = new GeodeRunner(settings)
-    geode = Some(runner)
-    runner.getLocatorPort
-  }
-
-  def stop(): Unit = {
-    println("=== GeodeCluster shutdown: " + geode.toString)
-    geode match {
-      case None => println("Nothing to shutdown.")
-      case Some(runner) => runner.stopGeodeCluster()
-    }
-    geode = None
-    println("=== GeodeCluster shutdown finished.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala
deleted file mode 100644
index 725a012..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector.testkit
-
-import java.io.{IOException, File}
-import java.net.InetAddress
-import java.util.Properties
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.io.FileUtils
-import org.apache.commons.io.filefilter.IOFileFilter
-
-/**
-* A class that manages Geode locator and servers.  Uses gfsh to
-* start and stop the locator and servers.
-*/
-class GeodeRunner(settings: Properties) {
-  val gfshCmd = new File(getCurrentDirectory, "../../geode-assembly/build/install/apache-geode/bin/gfsh").toString
-  val cacheXMLFile = settings.get("cache-xml-file")
-  val numServers: Int = settings.get("num-of-servers").asInstanceOf[String].toInt
-  val cwd = new File(".").getAbsolutePath
-  val geodeFunctionsTargetDir = new File("../geode-functions/target")
-  val testroot = "target/testgeode"
-  val classpath = new File(cwd, "target/scala-2.10/it-classes/")
-  val locatorPort = startGeodeCluster(numServers)
-
-  def getLocatorPort: Int = locatorPort
-
-  private def getCurrentDirectory = new File( "." ).getCanonicalPath
-  
-  private def startGeodeCluster(numServers: Int): Int = {
-    //ports(0) for Geode locator, the other ports are for Geode servers
-    val ports: Seq[Int] = IOUtils.getRandomAvailableTCPPorts(2 + numServers)
-    startGeodeLocator(ports(0), ports(1))
-    startGeodeServers(ports(0), ports.drop(2))
-    registerFunctions(ports(1))
-    ports(0)
-  }
-
-  private def startGeodeLocator(locatorPort: Int, jmxHttpPort:Int) {
-    println(s"=== GeodeRunner: starting locator on port $locatorPort")
-    val locatorDir = new File(cwd, s"$testroot/locator")
-    if (locatorDir.exists())
-      FileUtils.deleteDirectory(locatorDir)
-    IOUtils.mkdir(locatorDir)
-    new ProcessBuilder()
-      .command(gfshCmd, "start", "locator",
-        "--name=locator",
-        s"--dir=$locatorDir",
-        s"--port=$locatorPort",
-        s"--J=-Dgemfire.jmx-manager-http-port=$jmxHttpPort")
-      .inheritIO()
-      .start()
-
-    // Wait 30 seconds for locator to start
-    println(s"=== GeodeRunner: waiting for locator on port $locatorPort")
-    if (!IOUtils.waitForPortOpen(InetAddress.getByName("localhost"), locatorPort, 30000))
-      throw new IOException("Failed to start Geode locator.")
-    println(s"=== GeodeRunner: done waiting for locator on port $locatorPort")
-  }
-
-  private def startGeodeServers(locatorPort: Int, serverPorts: Seq[Int]) {
-    val procs = for (i <- 0 until serverPorts.length) yield {
-      println(s"=== GeodeRunner: starting server${i+1} with clientPort ${serverPorts(i)}")
-      val serverDir = new File(cwd, s"$testroot/server${i+1}")
-      if (serverDir.exists())
-        FileUtils.deleteDirectory(serverDir)
-      IOUtils.mkdir(serverDir)
-      new ProcessBuilder()
-        .command(gfshCmd, "start", "server",
-          s"--name=server${i+1}",
-          s"--locators=localhost[$locatorPort]",
-          s"--bind-address=localhost",
-          s"--server-port=${serverPorts(i)}",
-          s"--dir=$serverDir",
-          s"--cache-xml-file=$cacheXMLFile",
-          s"--classpath=$classpath")
-        .inheritIO()
-        .start()
-    }
-    procs.foreach(p => p.waitFor)
-    println(s"All $serverPorts.length servers have been started") 
-  }
-  
-  private def registerFunctions(jmxHttpPort:Int) {
-    import scala.collection.JavaConversions._
-    FileUtils.listFiles(geodeFunctionsTargetDir, fileFilter, dirFilter).foreach{  f => registerFunction(jmxHttpPort, f)}
-  }
-  
-  def fileFilter = new IOFileFilter {
-    def accept (file: File) = file.getName.endsWith(".jar") && file.getName.startsWith("geode-functions")
-    def accept (dir: File, name: String) = name.endsWith(".jar") && name.startsWith("geode-functions")
-  }
-  
-  def dirFilter = new IOFileFilter {
-    def accept (file: File) = file.getName.startsWith("scala")
-    def accept (dir: File, name: String) = name.startsWith("scala")
-  }
-  
-  private def registerFunction(jmxHttpPort:Int, jar:File) {
-    println("Deploying:" + jar.getName)
-    import io.pivotal.geode.spark.connector.GeodeFunctionDeployer
-    val deployer = new GeodeFunctionDeployer(new HttpClient())
-    deployer.deploy("localhost", jmxHttpPort, jar)
-  }
-
-  def stopGeodeCluster(): Unit = {
-    stopGeodeServers(numServers)
-    stopGeodeLocator()
-    if (!IOUtils.waitForPortClose(InetAddress.getByName("localhost"), getLocatorPort, 30000))
-      throw new IOException(s"Failed to stop Geode locator at port $getLocatorPort.")
-    println(s"Successfully stop Geode locator at port $getLocatorPort.")
-  }
-
-  private def stopGeodeLocator() {
-    println(s"=== GeodeRunner: stop locator")
-    val p = new ProcessBuilder()
-      .inheritIO()
-      .command(gfshCmd, "stop", "locator", s"--dir=$testroot/locator")
-      .start()
-     p.waitFor()
-  }
-
-  private def stopGeodeServers(numServers: Int) {
-   val procs = for (i <-1 to numServers) yield {
-       println(s"=== GeodeRunner: stop server $i.")
-       new ProcessBuilder()
-        .inheritIO()
-        .command(gfshCmd, "stop", "server", s"--dir=$testroot/server$i")
-        .start()
-   }
-   procs.foreach(p => p.waitFor())
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala
deleted file mode 100644
index 6d667e9..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector.testkit
-
-import java.io.{File, IOException}
-import java.net.{InetAddress, Socket}
-import org.apache.geode.internal.AvailablePort
-import scala.util.Try
-import org.apache.log4j.PropertyConfigurator
-import java.util.Properties
-
-object IOUtils {
-
-  /** Makes a new directory or throws an `IOException` if it cannot be made */
-  def mkdir(dir: File): File = {
-    if (!dir.mkdirs())
-      throw new IOException(s"Could not create dir $dir")
-    dir
-  }
-
-  private def socketPortProb(host: InetAddress, port: Int) = Iterator.continually {
-    Try {
-      Thread.sleep(100)
-      new Socket(host, port).close()
-    }
-  }
-  
-  /**
-   * Waits until a port at the given address is open or timeout passes.
-   * @return true if managed to connect to the port, false if timeout happened first
-   */
-  def waitForPortOpen(host: InetAddress, port: Int, timeout: Long): Boolean = {
-    val startTime = System.currentTimeMillis()
-    socketPortProb(host, port)
-      .dropWhile(p => p.isFailure && System.currentTimeMillis() - startTime < timeout)
-      .next()
-      .isSuccess
-  }
-
-  /**
-   * Waits until a port at the given address is close or timeout passes.
-   * @return true if host:port is un-connect-able, false if timeout happened first
-   */
-  def waitForPortClose(host: InetAddress, port: Int, timeout: Long): Boolean = {
-    val startTime = System.currentTimeMillis()
-    socketPortProb(host, port)
-      .dropWhile(p => p.isSuccess && System.currentTimeMillis() - startTime < timeout)
-      .next()
-      .isFailure
-  }
-
-  /**
-   * Returns array of unique randomly available tcp ports of specified count.
-   */
-  def getRandomAvailableTCPPorts(count: Int): Seq[Int] =
-    (0 until count).map(x => AvailablePort.getRandomAvailablePortKeeper(AvailablePort.SOCKET))
-      .map{x => x.release(); x.getPort}.toArray
-
-  /**
-   * config a log4j properties used for integration tests
-   */
-  def configTestLog4j(level: String, props: (String, String)*): Unit = {
-    val pro = new Properties()
-    props.foreach(p => pro.put(p._1, p._2))
-    configTestLog4j(level, pro)
-  }
-
-  def configTestLog4j(level: String, props: Properties): Unit = {
-    val pro = new Properties()
-    pro.put("log4j.rootLogger", s"$level, console")
-    pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
-    pro.put("log4j.appender.console.target", "System.err")
-    pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
-    pro.put("log4j.appender.console.layout.ConversionPattern",
-      "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
-    pro.putAll(props)
-    PropertyConfigurator.configure(pro)
-    
-  }
-}