You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/21 19:17:18 UTC
[43/50] [abbrv] incubator-geode git commit: GEODE-1244: Package,
directory, project and file rename for geode-spark-connector
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
new file mode 100644
index 0000000..d6f8d1f
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import java.io.Serializable;
+import java.util.Properties;
+import com.gemstone.gemfire.cache.Declarable;
+
+/**
+ * Represents a number of shares of a stock ("security") held in a {@link
+ * Portfolio}.
+ * </p>
+ * This class is <code>Serializable</code> because we want it to be distributed
+ * to multiple members of a distributed system. Because this class is
+ * <code>Declarable</code>, we can describe instances of it in a Geode
+ * <code>cache.xml</code> file.
+ * </p>
+ *
+ */
+public class Position implements Declarable, Serializable {
+
+ private static final long serialVersionUID = -8229531542107983344L;
+
+ private String secId;
+ private double qty;
+ private double mktValue;
+
+ public Position(Properties props) {
+ init(props);
+ }
+
+ @Override
+ public void init(Properties props) {
+ this.secId = props.getProperty("secId");
+ this.qty = Double.parseDouble(props.getProperty("qty"));
+ this.mktValue = Double.parseDouble(props.getProperty("mktValue"));
+ }
+
+ public String getSecId(){
+ return this.secId;
+ }
+
+ public double getQty(){
+ return this.qty;
+ }
+
+ public double getMktValue() {
+ return this.mktValue;
+ }
+
+ @Override
+ public String toString(){
+ return new StringBuilder()
+ .append("Position [secId=").append(secId)
+ .append(" qty=").append(this.qty)
+ .append(" mktValue=").append(mktValue).append("]").toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml b/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml
new file mode 100644
index 0000000..79893d6
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE cache PUBLIC
+ "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
+ "http://www.gemstone.com/dtd/cache6_5.dtd" >
+
+<cache>
+ <!-- test region for OQL test -->
+ <region name="obj_obj_region" refid="PARTITION_REDUNDANT" />
+
+ <region name="obj_obj_rep_region" refid="REPLICATE" />
+
+ <region name="str_int_region" refid="PARTITION_REDUNDANT">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="str_str_region" refid="PARTITION_REDUNDANT">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.String</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="str_str_rep_region" refid="REPLICATE">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.String</value-constraint>
+ </region-attributes>
+ </region>
+</cache>
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml b/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml
new file mode 100644
index 0000000..3023959
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE cache PUBLIC
+ "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
+ "http://www.gemstone.com/dtd/cache6_5.dtd" >
+
+<cache>
+ <!-- combinations of key, value types with region types -->
+ <region name="pr_r_obj_obj_region" refid="PARTITION_REDUNDANT" />
+ <region name="pr_obj_obj_region" refid="PARTITION" />
+ <region name="rr_obj_obj_region" refid="REPLICATE" />
+ <region name="rr_p_obj_obj_region" refid="REPLICATE_PERSISTENT" />
+
+ <region name="pr_r_str_int_region" refid="PARTITION_REDUNDANT">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="pr_str_int_region" refid="PARTITION">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="rr_str_int_region" refid="REPLICATE">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="rr_p_str_int_region" refid="REPLICATE_PERSISTENT">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+</cache>
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala
new file mode 100644
index 0000000..a26bcbd
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector
+
+import java.util.Properties
+import com.gemstone.gemfire.cache.query.QueryService
+import com.gemstone.gemfire.cache.query.internal.StructImpl
+import io.pivotal.geode.spark.connector._
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.internal.{RegionMetadata, DefaultGeodeConnectionManager}
+import io.pivotal.geode.spark.connector.internal.oql.{RDDConverter, QueryRDD}
+import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster
+import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils
+import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream}
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import scala.collection.JavaConversions
+import scala.reflect.ClassTag
+
+case class Number(str: String, len: Int)
+
+class BasicIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
+
+ var sc: SparkContext = null
+
+ override def beforeAll() {
+ // start geode cluster, and spark context
+ val settings = new Properties()
+ settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml")
+ settings.setProperty("num-of-servers", "2")
+ val locatorPort = GeodeCluster.start(settings)
+
+ // start spark context in local mode
+ IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
+ "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG")
+ val conf = new SparkConf()
+ .setAppName("BasicIntegrationTest")
+ .setMaster("local[2]")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "io.pivotal.geode.spark.connector.GeodeKryoRegistrator")
+
+ sc = new SparkContext(conf)
+ }
+
+ override def afterAll() {
+ // stop connection, spark context, and geode cluster
+ DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
+ sc.stop()
+ GeodeCluster.stop()
+ }
+
+ //Convert Map[Object, Object] to java.util.Properties
+ private def map2Props(map: Map[Object, Object]): java.util.Properties =
+ (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
+
+ // ===========================================================
+ // DefaultGeodeConnection functional tests
+ // ===========================================================
+
+ test("DefaultGeodeConnection.validateRegion()") {
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+
+ // normal exist-region
+ var regionPath: String = "str_str_region"
+ conn.validateRegion[String, String](regionPath)
+
+ // non-exist region
+ regionPath = "non_exist_region"
+ try {
+ conn.validateRegion[String, String](regionPath)
+ fail("validateRegion failed to catch non-exist region error")
+ } catch {
+ case e: RuntimeException =>
+ if (! e.getMessage.contains(s"The region named $regionPath was not found"))
+ fail("validateRegion gives wrong exception on non-exist region", e)
+ case e: Throwable =>
+ fail("validateRegion gives wrong exception on non-exist region", e)
+ }
+
+ // Note: currently, can't catch type mismatch error
+ conn.validateRegion[String, Integer]("str_str_region")
+ }
+
+ test("DefaultGeodeConnection.getRegionMetadata()") {
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+
+ // exist region
+ validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, false)
+ validateRegionMetadata(conn, "str_int_region", true, 113, "java.lang.String", "java.lang.Integer", false)
+ validateRegionMetadata(conn, "str_str_rep_region", false, 0, "java.lang.String", "java.lang.String", true)
+
+ // non-exist region
+ assert(! conn.getRegionMetadata("no_exist_region").isDefined)
+ }
+
+ def validateRegionMetadata(
+ conn: GeodeConnection, regionPath: String, partitioned: Boolean, buckets: Int,
+ keyType: String, valueType: String, emptyMap: Boolean): Unit = {
+
+ val mdOption = conn.getRegionMetadata(regionPath)
+ val md = mdOption.get
+
+ assert(md.getRegionPath == s"/$regionPath")
+ assert(md.isPartitioned == partitioned)
+ assert(md.getKeyTypeName == keyType)
+ assert(md.getValueTypeName == valueType)
+ assert(md.getTotalBuckets == buckets)
+ if (emptyMap) assert(md.getServerBucketMap == null)
+ else assert(md.getServerBucketMap != null)
+ }
+
+ test("DefaultGeodeConnection.getRegionProxy()") {
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+
+ val region1 = conn.getRegionProxy[String, String]("str_str_region")
+ region1.put("1", "One")
+ assert(region1.get("1") == "One")
+ region1.remove("1")
+ assert(region1.get("1") == null)
+
+ // getRegionProxy doesn't fail when region doesn't exist
+ val region2 = conn.getRegionProxy[String, String]("non_exist_region")
+ try {
+ region2.put("1", "One")
+ fail("getRegionProxy failed to catch non-exist region error")
+ } catch {
+ case e: Exception =>
+ if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region named /non_exist_region was not found")) {
+ e.printStackTrace()
+ fail("validateRegion gives wrong exception on non-exist region", e)
+ }
+ }
+ }
+
+ // Note: DefaultGeodeConnecton.getQuery() and getRegionData() are covered by
+ // RetrieveRegionIntegrationTest.scala and following OQL tests.
+
+ // ===========================================================
+ // OQL functional tests
+ // ===========================================================
+
+ private def initRegion(regionName: String): Unit = {
+
+ //Populate some data in the region
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+
+ //This will call the implicit conversion map2Properties in connector package object, since it is Map[String, String]
+ var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", "mktValue" -> "24.42"))
+ var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", "mktValue" -> "34.29"))
+ val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> "type1", "status" -> "active",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("1", portfolio1)
+
+ position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", "mktValue" -> "12.925"))
+ position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", "mktValue" -> "21.972"))
+ val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> "type2", "status" -> "inactive",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("2", portfolio2)
+
+ position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
+ position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
+ val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> "type3", "status" -> "active",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("3", portfolio3)
+
+ position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" -> "67.356572"))
+ position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" -> "101.34"))
+ val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> "type1", "status" -> "inactive",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("4", portfolio4)
+
+ position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" -> "67.356572"))
+ position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" -> "101.34"))
+ val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> "type2", "status" -> "active",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("5", portfolio5)
+
+ position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" -> "67.356572"))
+ position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" -> "101.34"))
+ val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> "type3", "status" -> "inactive",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("6", portfolio6)
+
+ position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
+ position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
+ val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> "type4", "status" -> "active",
+ "position1" -> position1, "position2" -> position2)))
+ //Not using null, due to intermittent query failure on column containing null, likely a Spark SQL bug
+ //portfolio7.setType(null)
+ rgn.put("7", portfolio7)
+ }
+
+ private def getQueryRDD[T: ClassTag](
+ query: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)): QueryRDD[T] =
+ new QueryRDD[T](sc, query, connConf)
+
+ test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Partitioned Region") {
+ simpleQuery("obj_obj_region")
+ }
+
+ test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Replicated Region") {
+ simpleQuery("obj_obj_rep_region")
+ }
+
+ private def simpleQuery(regionName: String) {
+ //Populate some data in the region
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[String, String] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
+
+ //Create QueryRDD using OQL
+ val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from /$regionName")
+
+ //verify the QueryRDD
+ val oqlRS: Array[String] = OQLResult.collect()
+ oqlRS should have length 3
+ oqlRS should contain theSameElementsAs List("one", "two", "three")
+
+ //Convert QueryRDD to DataFrame
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ // this is used to implicitly convert an RDD to a DataFrame.
+ import sqlContext.implicits._
+ val dataFrame = OQLResult.map(x => Number(x, x.length)).toDF()
+ //Register dataFrame as a table of two columns of type String and Int respectively
+ dataFrame.registerTempTable("numberTable")
+
+ //Issue SQL query against the table
+ val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
+ //Verify the SQL query result, r(0) mean column 0
+ val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
+ sqlRS should have length 3
+ sqlRS should contain theSameElementsAs List("one", "two", "three")
+
+ //Convert QueryRDD to DataFrame using RDDConverter
+ val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+ //Register dataFrame2 as a table of two columns of type String and Int respectively
+ dataFrame2.registerTempTable("numberTable2")
+
+ //Issue SQL query against the table
+ val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2")
+ //Verify the SQL query result, r(0) mean column 0
+ val sqlRS2: Array[Any] = SQLResult2.map(r => r(0)).collect()
+ sqlRS2 should have length 3
+ sqlRS2 should contain theSameElementsAs List("one", "two", "three")
+
+ //Remove the region entries, because other tests might use the same region as well
+ List("1", "2", "3").foreach(rgn.remove)
+ }
+
+ test("Run Geode OQL query and directly return DataFrame: Partitioned Region") {
+ simpleQueryDataFrame("obj_obj_region")
+ }
+
+ test("Run Geode OQL query and directly return DataFrame: Replicated Region") {
+ simpleQueryDataFrame("obj_obj_rep_region")
+ }
+
+ private def simpleQueryDataFrame(regionName: String) {
+ //Populate some data in the region
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[String, String] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"select * from /$regionName")
+ dataFrame.registerTempTable("numberTable")
+
+ //Issue SQL query against the table
+ val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
+ //Verify the SQL query result, r(0) mean column 0
+ val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
+ sqlRS should have length 3
+ sqlRS should contain theSameElementsAs List("one", "two", "three")
+
+ //Remove the region entries, because other tests might use the same region as well
+ List("1", "2", "3").foreach(rgn.remove)
+ }
+
+ test("Geode OQL query with UDT: Partitioned Region") {
+ queryUDT("obj_obj_region")
+ }
+
+ test("Geode OQL query with UDT: Replicated Region") {
+ queryUDT("obj_obj_rep_region")
+ }
+
+ private def queryUDT(regionName: String) {
+
+ //Populate some data in the region
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+ val e1: Employee = new Employee("hello", 123)
+ val e2: Employee = new Employee("world", 456)
+ rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
+
+ //Create QueryRDD using OQL
+ val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from /$regionName")
+
+ //verify the QueryRDD
+ val oqlRS: Array[Object] = OQLResult.collect()
+ oqlRS should have length 2
+ oqlRS.map(e => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should contain theSameElementsAs List(123, 456)
+
+ //Convert QueryRDD to DataFrame
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+ //Convert QueryRDD to DataFrame using RDDConverter
+ val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+ dataFrame.registerTempTable("employee")
+ val SQLResult = sqlContext.sql("SELECT * FROM employee")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.map(r => r(0)).collect()
+ sqlRS should have length 2
+ sqlRS should contain theSameElementsAs List("hello", "world")
+
+ List("1", "2").foreach(rgn.remove)
+ }
+
+ test("Geode OQL query with UDT and directly return DataFrame: Partitioned Region") {
+ queryUDTDataFrame("obj_obj_region")
+ }
+
+ test("Geode OQL query with UDT and directly return DataFrame: Replicated Region") {
+ queryUDTDataFrame("obj_obj_rep_region")
+ }
+
+ private def queryUDTDataFrame(regionName: String) {
+ //Populate some data in the region
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+ val e1: Employee = new Employee("hello", 123)
+ val e2: Employee = new Employee("world", 456)
+ rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"select name, age from /$regionName")
+
+ dataFrame.registerTempTable("employee")
+ val SQLResult = sqlContext.sql("SELECT * FROM employee")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.map(r => r(0)).collect()
+ sqlRS should have length 2
+ sqlRS should contain theSameElementsAs List("hello", "world")
+
+ List("1", "2").foreach(rgn.remove)
+ }
+
+ test("Geode OQL query with more complex UDT: Partitioned Region") {
+ complexUDT("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT: Replicated Region") {
+ complexUDT("obj_obj_rep_region")
+ }
+
+ private def complexUDT(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create QueryRDD using OQL
+ val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
+
+ //verify the QueryRDD
+ val oqlRS: Array[Int] = OQLResult.collect().map(r => r.asInstanceOf[Portfolio].getId)
+ oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
+
+ //Convert QueryRDD to DataFrame
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+ //Convert QueryRDD to DataFrame using RDDConverter
+ val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
+ sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
+ }
+
+ test("Geode OQL query with more complex UDT and directly return DataFrame: Partitioned Region") {
+ complexUDTDataFrame("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT and directly return DataFrame: Replicated Region") {
+ complexUDTDataFrame("obj_obj_rep_region")
+ }
+
+ private def complexUDTDataFrame(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
+ sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
+ }
+
+ test("Geode OQL query with more complex UDT with Projection: Partitioned Region") {
+ queryComplexUDTProjection("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT with Projection: Replicated Region") {
+ queryComplexUDTProjection("obj_obj_rep_region")
+ }
+
+ private def queryComplexUDTProjection(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create QueryRDD using OQL
+ val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
+
+ //verify the QueryRDD
+ val oqlRS: Array[Int] = OQLResult.collect().map(si => si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int])
+ oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
+
+ //Convert QueryRDD to DataFrame
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+ //Convert QueryRDD to DataFrame using RDDConverter
+ val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0))
+ sqlRS should contain theSameElementsAs List(3)
+ }
+
+ test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Partitioned Region") {
+ queryComplexUDTProjectionDataFrame("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Replicated Region") {
+ queryComplexUDTProjectionDataFrame("obj_obj_rep_region")
+ }
+
+ private def queryComplexUDTProjectionDataFrame(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0))
+ sqlRS should contain theSameElementsAs List(3)
+ }
+
+ test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Partitioned Region") {
+ queryComplexUDTNestProjectionDataFrame("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Replicated Region") {
+ queryComplexUDTNestProjectionDataFrame("obj_obj_rep_region")
+ }
+
+ private def queryComplexUDTNestProjectionDataFrame(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"""SELECT r.id, r."type", r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'""")
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0))
+ sqlRS should contain theSameElementsAs List(3)
+ }
+
+ test("Undefined instance deserialization: Partitioned Region") {
+ undefinedInstanceDeserialization("obj_obj_region")
+ }
+
+ test("Undefined instance deserialization: Replicated Region") {
+ undefinedInstanceDeserialization("obj_obj_rep_region")
+ }
+
+ private def undefinedInstanceDeserialization(regionName: String) {
+
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+
+ //Put some new data
+ rgn.put("1", "one")
+
+ //Query some non-existent columns, which should return UNDEFINED
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"SELECT col100, col200 FROM /$regionName")
+ val col1 = dataFrame.first().apply(0)
+ val col2 = dataFrame.first().apply(1)
+ assert(col1 == QueryService.UNDEFINED)
+ assert(col2 == QueryService.UNDEFINED)
+ //Verify that col1 and col2 refer to the same Undefined object
+ assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef])
+ }
+
+ test("RDD.saveToGeode") {
+ val regionName = "str_str_region"
+ // generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66))
+ val data = (1 to 6).map(_.toString).map(e=> (e, e*2))
+ val rdd = sc.parallelize(data)
+ rdd.saveToGeode(regionName)
+
+ // verify
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val region: Region[String, String] = connConf.getConnection.getRegionProxy(regionName)
+ println("region key set on server: " + region.keySetOnServer())
+ assert((1 to 6).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
+ (1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e)))
+ }
+
+ // ===========================================================
+ // DStream.saveToGeode() functional tests
+ // ===========================================================
+
+ test("Basic DStream test") {
+ import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener}
+ import io.pivotal.geode.spark.connector.streaming._
+ import org.apache.spark.streaming.ManualClockHelper
+
+ class TestStreamListener extends StreamingListener {
+ var count = 0
+ override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = count += 1
+ }
+
+ def batchDuration = Seconds(1)
+ val ssc = new StreamingContext(sc, batchDuration)
+ val input = Seq(1 to 4, 5 to 8, 9 to 12)
+ val dstream = new TestInputDStream(ssc, input, 2)
+ dstream.saveToGeode[String, Int]("str_int_region", (e: Int) => (e.toString, e))
+ try {
+ val listener = new TestStreamListener
+ ssc.addStreamingListener(listener)
+ ssc.start()
+ ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * input.length)
+ while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50)
+ } catch {
+ case e: Exception => e.printStackTrace(); throw e
+// } finally {
+// ssc.stop()
+ }
+
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val region: Region[String, Int] = conn.getRegionProxy("str_int_region")
+
+ // verify geode region contents
+ println("region key set on server: " + region.keySetOnServer())
+ assert((1 to 12).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
+ (1 to 12).foreach(e => assert(e == region.get(e.toString)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
new file mode 100644
index 0000000..b7a1dda
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector
+
+import java.util.Properties
+
+import io.pivotal.geode.spark.connector._
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager
+import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster
+import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import java.util.{HashMap => JHashMap}
+
+class RDDJoinRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
+
+ var sc: SparkContext = null
+ val numServers = 3
+ val numObjects = 1000
+
+ override def beforeAll() {
+ // start geode cluster, and spark context
+ val settings = new Properties()
+ settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
+ settings.setProperty("num-of-servers", numServers.toString)
+ val locatorPort = GeodeCluster.start(settings)
+
+ // start spark context in local mode
+ IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
+ "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG")
+ val conf = new SparkConf()
+ .setAppName("RDDJoinRegionIntegrationTest")
+ .setMaster("local[2]")
+ .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
+ sc = new SparkContext(conf)
+ }
+
+ override def afterAll() {
+ // stop connection, spark context, and geode cluster
+ DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
+ sc.stop()
+ GeodeCluster.stop()
+ }
+
+// def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
+// assert(map1.size == map2.size)
+// map1.foreach(e => {
+// assert(map2.contains(e._1))
+// assert (e._2 == map2.get(e._1).get)
+// })
+// }
+
+ // --------------------------------------------------------------------------------------------
+ // PairRDD.joinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2]
+ // --------------------------------------------------------------------------------------------
+
+ test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", JoinTest) {
+ verifyPairRDDJoinRegionWithSameKeyType("rr_str_int_region")
+ }
+
+ test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", JoinTest) {
+ verifyPairRDDJoinRegionWithSameKeyType("pr_str_int_region")
+ }
+
+ test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", JoinTest) {
+ verifyPairRDDJoinRegionWithSameKeyType("pr_r_str_int_region")
+ }
+
+ def verifyPairRDDJoinRegionWithSameKeyType(regionPath: String): Unit = {
+ val entriesMap: JHashMap[String, Int] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(entriesMap)
+
+ val data = (-5 until 50).map(x => ("k_" + x, x*2))
+ val rdd = sc.parallelize(data)
+
+ val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, connConf)
+ val rdd2Content = rdd2.collect()
+
+ val expectedMap = (0 until 50).map(i => ((s"k_$i", i*2), i)).toMap
+ // matchMaps[(String, Int), Int](expectedMap, rdd2Content.toMap)
+ assert(expectedMap == rdd2Content.toMap)
+ }
+
+ // ------------------------------------------------------------------------------------------------------
+ // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2]
+ // -------------------------------------------------------------------------------------------------------
+
+ test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", JoinTest) {
+ verifyPairRDDJoinRegionWithDiffKeyType("rr_str_int_region")
+ }
+
+ test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", JoinTest) {
+ verifyPairRDDJoinRegionWithDiffKeyType("pr_str_int_region")
+ }
+
+ test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", JoinTest) {
+ verifyPairRDDJoinRegionWithDiffKeyType("pr_r_str_int_region")
+ }
+
+ def verifyPairRDDJoinRegionWithDiffKeyType(regionPath: String): Unit = {
+ val entriesMap: JHashMap[String, Int] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(entriesMap)
+
+ val data = (-5 until 50).map(x => (x, x*2))
+ val rdd = sc.parallelize(data)
+
+ val func :((Int, Int)) => String = pair => s"k_${pair._1}"
+
+ val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, func /*, connConf*/)
+ val rdd2Content = rdd2.collect()
+
+ val expectedMap = (0 until 50).map(i => ((i, i*2), i)).toMap
+ // matchMaps[(Int, Int), Int](expectedMap, rdd2Content.toMap)
+ assert(expectedMap == rdd2Content.toMap)
+ }
+
+ // ------------------------------------------------------------------------------------------------
+ // PairRDD.outerJoinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2]
+ // ------------------------------------------------------------------------------------------------
+
+ test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", OuterJoinTest) {
+ verifyPairRDDOuterJoinRegionWithSameKeyType("rr_str_int_region")
+ }
+
+ test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", OuterJoinTest) {
+ verifyPairRDDOuterJoinRegionWithSameKeyType("pr_str_int_region")
+ }
+
+ test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", OuterJoinTest) {
+ verifyPairRDDOuterJoinRegionWithSameKeyType("pr_r_str_int_region")
+ }
+
+ def verifyPairRDDOuterJoinRegionWithSameKeyType(regionPath: String): Unit = {
+ val entriesMap: JHashMap[String, Int] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(entriesMap)
+
+ val data = (-5 until 50).map(x => ("k_" + x, x*2))
+ val rdd = sc.parallelize(data)
+
+ val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath /*, connConf*/)
+ val rdd2Content = rdd2.collect()
+
+ val expectedMap = (-5 until 50).map {
+ i => if (i < 0) ((s"k_$i", i * 2), None)
+ else ((s"k_$i", i*2), Some(i))}.toMap
+ // matchMaps[(String, Int), Option[Int]](expectedMap, rdd2Content.toMap)
+ assert(expectedMap == rdd2Content.toMap)
+ }
+
+ // ------------------------------------------------------------------------------------------------------
+ // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2]
+ // -------------------------------------------------------------------------------------------------------
+
+ test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", OuterJoinTest) {
+ verifyPairRDDOuterJoinRegionWithDiffKeyType("rr_str_int_region")
+ }
+
+ test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", OuterJoinTest) {
+ verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_str_int_region")
+ }
+
+ test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", OuterJoinTest) {
+ verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_r_str_int_region")
+ }
+
+ def verifyPairRDDOuterJoinRegionWithDiffKeyType(regionPath: String): Unit = {
+ val entriesMap: JHashMap[String, Int] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(entriesMap)
+
+ val data = (-5 until 50).map(x => (x, x*2))
+ val rdd = sc.parallelize(data)
+
+ val func :((Int, Int)) => String = pair => s"k_${pair._1}"
+
+ val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, func, connConf)
+ val rdd2Content = rdd2.collect()
+
+ val expectedMap = (-5 until 50).map {
+ i => if (i < 0) ((i, i * 2), None)
+ else ((i, i*2), Some(i))}.toMap
+ // matchMaps[(Int, Int), Option[Int]](expectedMap, rdd2Content.toMap)
+ assert(expectedMap == rdd2Content.toMap)
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // RDD.joinGeodeRegion[K, V](regionPath, T => K, connConf): GeodeJoinRDD[T, K, V]
+ // --------------------------------------------------------------------------------------------
+
+ test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], replicated region", JoinTest) {
+ verifyRDDJoinRegion("rr_str_int_region")
+ }
+
+ test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned region", JoinTest) {
+ verifyRDDJoinRegion("pr_str_int_region")
+ }
+
+ test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", JoinTest) {
+ verifyRDDJoinRegion("pr_r_str_int_region")
+ }
+
+ def verifyRDDJoinRegion(regionPath: String): Unit = {
+ val entriesMap: JHashMap[String, Int] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(entriesMap)
+
+ val data = (-5 until 50).map(x => s"k_$x")
+ val rdd = sc.parallelize(data)
+
+ val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, x => x, connConf)
+ val rdd2Content = rdd2.collect()
+
+ val expectedMap = (0 until 50).map(i => (s"k_$i", i)).toMap
+ // matchMaps[String, Int](expectedMap, rdd2Content.toMap)
+ assert(expectedMap == rdd2Content.toMap)
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // RDD.outerJoinGeodeRegion[K, V](regionPath, T => K, connConf): GeodeJoinRDD[T, K, V]
+ // --------------------------------------------------------------------------------------------
+
+ test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], replicated region", OnlyTest) {
+ verifyRDDOuterJoinRegion("rr_str_int_region")
+ }
+
+ test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned region", OnlyTest) {
+ verifyRDDOuterJoinRegion("pr_str_int_region")
+ }
+
+ test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", OnlyTest) {
+ verifyRDDOuterJoinRegion("pr_r_str_int_region")
+ }
+
+ def verifyRDDOuterJoinRegion(regionPath: String): Unit = {
+ val entriesMap: JHashMap[String, Int] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(entriesMap)
+
+ val data = (-5 until 50).map(x => s"k_$x")
+ val rdd = sc.parallelize(data)
+
+ val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, x => x /*, connConf */)
+ val rdd2Content = rdd2.collect()
+
+ val expectedMap = (-5 until 50).map {
+ i => if (i < 0) (s"k_$i", None)
+ else (s"k_$i", Some(i))}.toMap
+ // matchMaps[String, Option[Int]](expectedMap, rdd2Content.toMap)
+ assert(expectedMap == rdd2Content.toMap)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala
new file mode 100644
index 0000000..1ad843e
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector
+
+import java.util.Properties
+
+import io.pivotal.geode.spark.connector._
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager
+import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster
+import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest.{Tag, BeforeAndAfterAll, FunSuite, Matchers}
+import java.util.{HashMap => JHashMap}
+
+
+class RetrieveRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
+
+ var sc: SparkContext = null
+ val numServers = 4
+ val numObjects = 1000
+
+ override def beforeAll() {
+ // start geode cluster, and spark context
+ val settings = new Properties()
+ settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
+ settings.setProperty("num-of-servers", numServers.toString)
+ val locatorPort = GeodeCluster.start(settings)
+
+ // start spark context in local mode
+ IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
+ "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG")
+ val conf = new SparkConf()
+ .setAppName("RetrieveRegionIntegrationTest")
+ .setMaster("local[2]")
+ .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
+ sc = new SparkContext(conf)
+ }
+
+ override def afterAll() {
+ // stop connection, spark context, and geode cluster
+ DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
+ sc.stop()
+ GeodeCluster.stop()
+ }
+
+ def executeTest[K,V](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,V]) = {
+ //Populate some data in the region
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[K, V] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(entriesMap)
+ verifyRetrieveRegion[K,V](regionName, entriesMap)
+ }
+
+ def verifyRetrieveRegion[K,V](regionName:String, entriesMap:java.util.Map[K,V]) = {
+ val rdd = sc.geodeRegion(regionName)
+ val collectedObjs = rdd.collect()
+ collectedObjs should have length entriesMap.size
+ import scala.collection.JavaConverters._
+ matchMaps[K,V](entriesMap.asScala.toMap, collectedObjs.toMap)
+ }
+
+ def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
+ assert(map1.size == map2.size)
+ map1.foreach(e => {
+ assert(map2.contains(e._1))
+ assert (e._2 == map2.get(e._1).get)
+ }
+ )
+ }
+
+ //Retrieve region for Partitioned Region where some nodes are empty (empty iterator)
+ //This test has to run first...the rest of the tests always use the same num objects
+ test("Retrieve Region for PR where some nodes are empty (Empty Iterator)") {
+ val numObjects = numServers - 1
+ val entriesMap:JHashMap[String, Int] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
+ executeTest[String, Int]("rr_str_int_region", numObjects, entriesMap)
+ }
+
+ //Test for retrieving from region containing string key and int value
+ def verifyRetrieveStringStringRegion(regionName:String) = {
+ val entriesMap:JHashMap[String, String] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("key_" + i, "value_" + i))
+ executeTest[String, String](regionName, numObjects, entriesMap)
+ }
+
+ test("Retrieve Region with replicate redundant string string") {
+ verifyRetrieveStringStringRegion("rr_obj_obj_region")
+ }
+
+ test("Retrieve Region with partitioned string string") {
+ verifyRetrieveStringStringRegion("pr_obj_obj_region")
+ }
+
+ test("Retrieve Region with partitioned redundant string string") {
+ verifyRetrieveStringStringRegion("pr_r_obj_obj_region")
+ }
+
+
+ //Test for retrieving from region containing string key and string value
+ def verifyRetrieveStringIntRegion(regionName:String) = {
+ val entriesMap:JHashMap[String, Int] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
+ executeTest[String, Int](regionName, numObjects, entriesMap)
+ }
+
+ test("Retrieve Region with replicate string int region") {
+ verifyRetrieveStringIntRegion("rr_str_int_region")
+ }
+
+ test("Retrieve Region with partitioned string int region") {
+ verifyRetrieveStringIntRegion("pr_str_int_region")
+ }
+
+ test("Retrieve Region with partitioned redundant string int region") {
+ verifyRetrieveStringIntRegion("pr_r_str_int_region")
+ }
+
+ //Tests for retrieving from region containing string key and object value
+ def verifyRetrieveStringObjectRegion(regionName:String) = {
+ val entriesMap:JHashMap[String, Object] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("key_" + i, new Employee("ename" + i, i)))
+ executeTest[String, Object](regionName, numObjects, entriesMap)
+ }
+
+ test("Retrieve Region with replicate string obj") {
+ verifyRetrieveStringObjectRegion("rr_obj_obj_region")
+ }
+
+ test("Retrieve Region with partitioned string obj") {
+ verifyRetrieveStringObjectRegion("pr_obj_obj_region")
+ }
+
+ test("Retrieve Region with partitioned redundant string obj") {
+ verifyRetrieveStringObjectRegion("pr_r_obj_obj_region")
+ }
+
+ //Test for retrieving from region containing string key and map value
+ def verifyRetrieveStringMapRegion(regionName:String) = {
+ val entriesMap:JHashMap[String,JHashMap[String,String]] = new JHashMap()
+ (0 until numObjects).map(i => {
+ val hashMap:JHashMap[String, String] = new JHashMap()
+ hashMap.put("mapKey:" + i, "mapValue:" + i)
+ entriesMap.put("key_" + i, hashMap)
+ })
+ executeTest(regionName, numObjects, entriesMap)
+ }
+
+ test("Retrieve Region with replicate string map region") {
+ verifyRetrieveStringMapRegion("rr_obj_obj_region")
+ }
+
+ test("Retrieve Region with partitioned string map region") {
+ verifyRetrieveStringMapRegion("pr_obj_obj_region")
+ }
+
+ test("Retrieve Region with partitioned redundant string map region") {
+ verifyRetrieveStringMapRegion("pr_r_obj_obj_region")
+ }
+
+ //Test and helpers specific for retrieving from region containing string key and byte[] value
+ def executeTestWithByteArrayValues[K](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,Array[Byte]]) = {
+ //Populate some data in the region
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[K, Array[Byte]] = conn.getRegionProxy(regionName)
+ rgn.putAll(entriesMap)
+ verifyRetrieveRegionWithByteArrayValues[K](regionName, entriesMap)
+ }
+
+ def verifyRetrieveRegionWithByteArrayValues[K](regionName:String, entriesMap:java.util.Map[K,Array[Byte]]) = {
+ val rdd = sc.geodeRegion(regionName)
+ val collectedObjs = rdd.collect()
+ collectedObjs should have length entriesMap.size
+ import scala.collection.JavaConverters._
+ matchByteArrayMaps[K](entriesMap.asScala.toMap, collectedObjs.toMap)
+ }
+
+ def matchByteArrayMaps[K](map1:Map[K,Array[Byte]], map2:Map[K,Array[Byte]]) = {
+ map1.foreach(e => {
+ assert(map2.contains(e._1))
+ assert (java.util.Arrays.equals(e._2, map2.get(e._1).get))
+ }
+ )
+ assert(map1.size == map2.size)
+
+ }
+
+ def verifyRetrieveStringByteArrayRegion(regionName:String) = {
+ val entriesMap:JHashMap[String, Array[Byte]] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("key_" + i, Array[Byte](192.toByte, 168.toByte, 0, i.toByte)))
+ executeTestWithByteArrayValues[String](regionName, numObjects, entriesMap)
+ }
+
+ test("Retrieve Region with replicate region string byte[] region") {
+ verifyRetrieveStringByteArrayRegion("rr_obj_obj_region")
+ }
+
+ test("Retrieve Region with partition region string byte[] region") {
+ verifyRetrieveStringByteArrayRegion("pr_obj_obj_region")
+ }
+
+ test("Retrieve Region with partition redundant region string byte[] region") {
+ verifyRetrieveStringByteArrayRegion("pr_r_obj_obj_region")
+ }
+
+ test("Retrieve Region with where clause on partitioned redundant region", FilterTest) {
+ verifyRetrieveRegionWithWhereClause("pr_r_str_int_region")
+ }
+
+ test("Retrieve Region with where clause on partitioned region", FilterTest) {
+ verifyRetrieveRegionWithWhereClause("pr_str_int_region")
+ }
+
+ test("Retrieve Region with where clause on replicated region", FilterTest) {
+ verifyRetrieveRegionWithWhereClause("rr_str_int_region")
+ }
+
+ def verifyRetrieveRegionWithWhereClause(regionPath: String): Unit = {
+ val entriesMap: JHashMap[String, Int] = new JHashMap()
+ (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
+
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(entriesMap)
+
+ val rdd = sc.geodeRegion(regionPath).where("value.intValue() < 50")
+ val expectedMap = (0 until 50).map(i => (s"key_$i", i)).toMap
+ val collectedObjs = rdd.collect()
+ // collectedObjs should have length expectedMap.size
+ matchMaps[String, Int](expectedMap, collectedObjs.toMap)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala
new file mode 100644
index 0000000..b8571d8
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark
+
+import org.scalatest.Tag
+
+package object connector {
+
+ object OnlyTest extends Tag("OnlyTest")
+ object FetchDataTest extends Tag("FetchDateTest")
+ object FilterTest extends Tag("FilterTest")
+ object JoinTest extends Tag("JoinTest")
+ object OuterJoinTest extends Tag("OuterJoinTest")
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala
new file mode 100644
index 0000000..18b2fd7
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector.testkit
+
+import java.util.Properties
+
+trait GeodeCluster {
+ def startGeodeCluster(settings: Properties): Int = {
+ println("=== GeodeCluster start()")
+ GeodeCluster.start(settings)
+ }
+}
+
+object GeodeCluster {
+ private var geode: Option[GeodeRunner] = None
+
+ def start(settings: Properties): Int = {
+ geode.map(_.stopGeodeCluster()) // Clean up any old running Geode instances
+ val runner = new GeodeRunner(settings)
+ geode = Some(runner)
+ runner.getLocatorPort
+ }
+
+ def stop(): Unit = {
+ println("=== GeodeCluster shutdown: " + geode.toString)
+ geode match {
+ case None => println("Nothing to shutdown.")
+ case Some(runner) => runner.stopGeodeCluster()
+ }
+ geode = None
+ println("=== GeodeCluster shutdown finished.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala
new file mode 100644
index 0000000..725a012
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector.testkit
+
+import java.io.{IOException, File}
+import java.net.InetAddress
+import java.util.Properties
+import org.apache.commons.httpclient.HttpClient
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.IOFileFilter
+
+/**
+* A class that manages Geode locator and servers. Uses gfsh to
+* start and stop the locator and servers.
+*/
+class GeodeRunner(settings: Properties) {
+ val gfshCmd = new File(getCurrentDirectory, "../../geode-assembly/build/install/apache-geode/bin/gfsh").toString
+ val cacheXMLFile = settings.get("cache-xml-file")
+ val numServers: Int = settings.get("num-of-servers").asInstanceOf[String].toInt
+ val cwd = new File(".").getAbsolutePath
+ val geodeFunctionsTargetDir = new File("../geode-functions/target")
+ val testroot = "target/testgeode"
+ val classpath = new File(cwd, "target/scala-2.10/it-classes/")
+ val locatorPort = startGeodeCluster(numServers)
+
+ def getLocatorPort: Int = locatorPort
+
+ private def getCurrentDirectory = new File( "." ).getCanonicalPath
+
+ private def startGeodeCluster(numServers: Int): Int = {
+ //ports(0) for Geode locator, the other ports are for Geode servers
+ val ports: Seq[Int] = IOUtils.getRandomAvailableTCPPorts(2 + numServers)
+ startGeodeLocator(ports(0), ports(1))
+ startGeodeServers(ports(0), ports.drop(2))
+ registerFunctions(ports(1))
+ ports(0)
+ }
+
+ private def startGeodeLocator(locatorPort: Int, jmxHttpPort:Int) {
+ println(s"=== GeodeRunner: starting locator on port $locatorPort")
+ val locatorDir = new File(cwd, s"$testroot/locator")
+ if (locatorDir.exists())
+ FileUtils.deleteDirectory(locatorDir)
+ IOUtils.mkdir(locatorDir)
+ new ProcessBuilder()
+ .command(gfshCmd, "start", "locator",
+ "--name=locator",
+ s"--dir=$locatorDir",
+ s"--port=$locatorPort",
+ s"--J=-Dgemfire.jmx-manager-http-port=$jmxHttpPort")
+ .inheritIO()
+ .start()
+
+ // Wait 30 seconds for locator to start
+ println(s"=== GeodeRunner: waiting for locator on port $locatorPort")
+ if (!IOUtils.waitForPortOpen(InetAddress.getByName("localhost"), locatorPort, 30000))
+ throw new IOException("Failed to start Geode locator.")
+ println(s"=== GeodeRunner: done waiting for locator on port $locatorPort")
+ }
+
+ private def startGeodeServers(locatorPort: Int, serverPorts: Seq[Int]) {
+ val procs = for (i <- 0 until serverPorts.length) yield {
+ println(s"=== GeodeRunner: starting server${i+1} with clientPort ${serverPorts(i)}")
+ val serverDir = new File(cwd, s"$testroot/server${i+1}")
+ if (serverDir.exists())
+ FileUtils.deleteDirectory(serverDir)
+ IOUtils.mkdir(serverDir)
+ new ProcessBuilder()
+ .command(gfshCmd, "start", "server",
+ s"--name=server${i+1}",
+ s"--locators=localhost[$locatorPort]",
+ s"--bind-address=localhost",
+ s"--server-port=${serverPorts(i)}",
+ s"--dir=$serverDir",
+ s"--cache-xml-file=$cacheXMLFile",
+ s"--classpath=$classpath")
+ .inheritIO()
+ .start()
+ }
+ procs.foreach(p => p.waitFor)
+ println(s"All $serverPorts.length servers have been started")
+ }
+
+ private def registerFunctions(jmxHttpPort:Int) {
+ import scala.collection.JavaConversions._
+ FileUtils.listFiles(geodeFunctionsTargetDir, fileFilter, dirFilter).foreach{ f => registerFunction(jmxHttpPort, f)}
+ }
+
+ def fileFilter = new IOFileFilter {
+ def accept (file: File) = file.getName.endsWith(".jar") && file.getName.startsWith("geode-functions")
+ def accept (dir: File, name: String) = name.endsWith(".jar") && name.startsWith("geode-functions")
+ }
+
+ def dirFilter = new IOFileFilter {
+ def accept (file: File) = file.getName.startsWith("scala")
+ def accept (dir: File, name: String) = name.startsWith("scala")
+ }
+
+ private def registerFunction(jmxHttpPort:Int, jar:File) {
+ println("Deploying:" + jar.getName)
+ import io.pivotal.geode.spark.connector.GeodeFunctionDeployer
+ val deployer = new GeodeFunctionDeployer(new HttpClient())
+ deployer.deploy("localhost", jmxHttpPort, jar)
+ }
+
+ def stopGeodeCluster(): Unit = {
+ stopGeodeServers(numServers)
+ stopGeodeLocator()
+ if (!IOUtils.waitForPortClose(InetAddress.getByName("localhost"), getLocatorPort, 30000))
+ throw new IOException(s"Failed to stop Geode locator at port $getLocatorPort.")
+ println(s"Successfully stop Geode locator at port $getLocatorPort.")
+ }
+
+ private def stopGeodeLocator() {
+ println(s"=== GeodeRunner: stop locator")
+ val p = new ProcessBuilder()
+ .inheritIO()
+ .command(gfshCmd, "stop", "locator", s"--dir=$testroot/locator")
+ .start()
+ p.waitFor()
+ }
+
+ private def stopGeodeServers(numServers: Int) {
+ val procs = for (i <-1 to numServers) yield {
+ println(s"=== GeodeRunner: stop server $i.")
+ new ProcessBuilder()
+ .inheritIO()
+ .command(gfshCmd, "stop", "server", s"--dir=$testroot/server$i")
+ .start()
+ }
+ procs.foreach(p => p.waitFor())
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala
new file mode 100644
index 0000000..21a9232
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector.testkit
+
+import java.io.{File, IOException}
+import java.net.{InetAddress, Socket}
+import com.gemstone.gemfire.internal.AvailablePort
+import scala.util.Try
+import org.apache.log4j.PropertyConfigurator
+import java.util.Properties
+
+object IOUtils {
+
+ /** Makes a new directory or throws an `IOException` if it cannot be made */
+ def mkdir(dir: File): File = {
+ if (!dir.mkdirs())
+ throw new IOException(s"Could not create dir $dir")
+ dir
+ }
+
+ private def socketPortProb(host: InetAddress, port: Int) = Iterator.continually {
+ Try {
+ Thread.sleep(100)
+ new Socket(host, port).close()
+ }
+ }
+
+ /**
+ * Waits until a port at the given address is open or timeout passes.
+ * @return true if managed to connect to the port, false if timeout happened first
+ */
+ def waitForPortOpen(host: InetAddress, port: Int, timeout: Long): Boolean = {
+ val startTime = System.currentTimeMillis()
+ socketPortProb(host, port)
+ .dropWhile(p => p.isFailure && System.currentTimeMillis() - startTime < timeout)
+ .next()
+ .isSuccess
+ }
+
+ /**
+ * Waits until a port at the given address is close or timeout passes.
+ * @return true if host:port is un-connect-able, false if timeout happened first
+ */
+ def waitForPortClose(host: InetAddress, port: Int, timeout: Long): Boolean = {
+ val startTime = System.currentTimeMillis()
+ socketPortProb(host, port)
+ .dropWhile(p => p.isSuccess && System.currentTimeMillis() - startTime < timeout)
+ .next()
+ .isFailure
+ }
+
+ /**
+ * Returns array of unique randomly available tcp ports of specified count.
+ */
+ def getRandomAvailableTCPPorts(count: Int): Seq[Int] =
+ (0 until count).map(x => AvailablePort.getRandomAvailablePortKeeper(AvailablePort.SOCKET))
+ .map{x => x.release(); x.getPort}.toArray
+
+ /**
+ * config a log4j properties used for integration tests
+ */
+ def configTestLog4j(level: String, props: (String, String)*): Unit = {
+ val pro = new Properties()
+ props.foreach(p => pro.put(p._1, p._2))
+ configTestLog4j(level, pro)
+ }
+
+ def configTestLog4j(level: String, props: Properties): Unit = {
+ val pro = new Properties()
+ pro.put("log4j.rootLogger", s"$level, console")
+ pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
+ pro.put("log4j.appender.console.target", "System.err")
+ pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
+ pro.put("log4j.appender.console.layout.ConversionPattern",
+ "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
+ pro.putAll(props)
+ PropertyConfigurator.configure(pro)
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
new file mode 100644
index 0000000..67f9e57
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming
+
+import org.apache.spark.util.ManualClock
+
+object ManualClockHelper {
+
+ def addToTime(ssc: StreamingContext, timeToAdd: Long): Unit = {
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.advance(timeToAdd)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
new file mode 100644
index 0000000..fce1e67
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.reflect.ClassTag
+
+class TestInputDStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
+ extends InputDStream[T](ssc_) {
+
+ def start() {}
+
+ def stop() {}
+
+ def compute(validTime: Time): Option[RDD[T]] = {
+ logInfo("Computing RDD for time " + validTime)
+ val index = ((validTime - zeroTime) / slideDuration - 1).toInt
+ val selectedInput = if (index < input.size) input(index) else Seq[T]()
+
+ // lets us test cases where RDDs are not created
+ if (selectedInput == null)
+ return None
+
+ val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
+ logInfo("Created RDD " + rdd.id + " with " + selectedInput)
+ Some(rdd)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
new file mode 100644
index 0000000..e7c7cf9
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.streaming.GeodeDStreamFunctions;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import java.util.Properties;
+
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream}
+ * to provide Geode Spark Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaDStreamFunctions<T> {
+
+ public final GeodeDStreamFunctions<T> dsf;
+
+ public GeodeJavaDStreamFunctions(JavaDStream<T> ds) {
+ this.dsf = new GeodeDStreamFunctions<T>(ds.dstream());
+ }
+
+ /**
+ * Save the JavaDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param opConf the optional parameters for this operation
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) {
+ dsf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the JavaDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+ * @param opConf the optional parameters for this operation
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func, Properties opConf) {
+ dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the JavaDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) {
+ dsf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
+ }
+
+ /**
+ * Save the JavaDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func) {
+ dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
new file mode 100644
index 0000000..2c83255
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.streaming.GeodePairDStreamFunctions;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import java.util.Properties;
+
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream}
+ * to provide Geode Spark Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaPairDStreamFunctions<K, V> {
+
+ public final GeodePairDStreamFunctions<K, V> dsf;
+
+ public GeodeJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) {
+ this.dsf = new GeodePairDStreamFunctions<K, V>(ds.dstream());
+ }
+
+ /**
+ * Save the JavaPairDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param opConf the optional parameters for this operation
+ */
+ public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) {
+ dsf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the JavaPairDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ */
+ public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
+ dsf.saveToGeode(regionPath, connConf, emptyStrStrMap());
+ }
+
+ /**
+ * Save the JavaPairDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param opConf the optional parameters for this operation
+ */
+ public void saveToGeode(String regionPath, Properties opConf) {
+ dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the JavaPairDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ */
+ public void saveToGeode(String regionPath) {
+ dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap());
+ }
+
+}