You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/21 19:17:02 UTC
[27/50] [abbrv] incubator-geode git commit: GEODE-1244: Revert rename
of package, directory, project and file rename for geode-spark-connector
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
deleted file mode 100644
index b7a1dda..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector
-
-import java.util.Properties
-
-import io.pivotal.geode.spark.connector._
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager
-import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster
-import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils
-import org.apache.spark.{SparkContext, SparkConf}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import java.util.{HashMap => JHashMap}
-
-class RDDJoinRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
-
- var sc: SparkContext = null
- val numServers = 3
- val numObjects = 1000
-
- override def beforeAll() {
- // start geode cluster, and spark context
- val settings = new Properties()
- settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
- settings.setProperty("num-of-servers", numServers.toString)
- val locatorPort = GeodeCluster.start(settings)
-
- // start spark context in local mode
- IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
- "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG")
- val conf = new SparkConf()
- .setAppName("RDDJoinRegionIntegrationTest")
- .setMaster("local[2]")
- .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
- sc = new SparkContext(conf)
- }
-
- override def afterAll() {
- // stop connection, spark context, and geode cluster
- DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
- sc.stop()
- GeodeCluster.stop()
- }
-
-// def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
-// assert(map1.size == map2.size)
-// map1.foreach(e => {
-// assert(map2.contains(e._1))
-// assert (e._2 == map2.get(e._1).get)
-// })
-// }
-
- // --------------------------------------------------------------------------------------------
- // PairRDD.joinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2]
- // --------------------------------------------------------------------------------------------
-
- test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", JoinTest) {
- verifyPairRDDJoinRegionWithSameKeyType("rr_str_int_region")
- }
-
- test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", JoinTest) {
- verifyPairRDDJoinRegionWithSameKeyType("pr_str_int_region")
- }
-
- test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", JoinTest) {
- verifyPairRDDJoinRegionWithSameKeyType("pr_r_str_int_region")
- }
-
- def verifyPairRDDJoinRegionWithSameKeyType(regionPath: String): Unit = {
- val entriesMap: JHashMap[String, Int] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
- val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
- val conn = connConf.getConnection
- val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
- rgn.removeAll(rgn.keySetOnServer())
- rgn.putAll(entriesMap)
-
- val data = (-5 until 50).map(x => ("k_" + x, x*2))
- val rdd = sc.parallelize(data)
-
- val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, connConf)
- val rdd2Content = rdd2.collect()
-
- val expectedMap = (0 until 50).map(i => ((s"k_$i", i*2), i)).toMap
- // matchMaps[(String, Int), Int](expectedMap, rdd2Content.toMap)
- assert(expectedMap == rdd2Content.toMap)
- }
-
- // ------------------------------------------------------------------------------------------------------
- // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2]
- // -------------------------------------------------------------------------------------------------------
-
- test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", JoinTest) {
- verifyPairRDDJoinRegionWithDiffKeyType("rr_str_int_region")
- }
-
- test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", JoinTest) {
- verifyPairRDDJoinRegionWithDiffKeyType("pr_str_int_region")
- }
-
- test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", JoinTest) {
- verifyPairRDDJoinRegionWithDiffKeyType("pr_r_str_int_region")
- }
-
- def verifyPairRDDJoinRegionWithDiffKeyType(regionPath: String): Unit = {
- val entriesMap: JHashMap[String, Int] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
- val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
- val conn = connConf.getConnection
- val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
- rgn.removeAll(rgn.keySetOnServer())
- rgn.putAll(entriesMap)
-
- val data = (-5 until 50).map(x => (x, x*2))
- val rdd = sc.parallelize(data)
-
- val func :((Int, Int)) => String = pair => s"k_${pair._1}"
-
- val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, func /*, connConf*/)
- val rdd2Content = rdd2.collect()
-
- val expectedMap = (0 until 50).map(i => ((i, i*2), i)).toMap
- // matchMaps[(Int, Int), Int](expectedMap, rdd2Content.toMap)
- assert(expectedMap == rdd2Content.toMap)
- }
-
- // ------------------------------------------------------------------------------------------------
- // PairRDD.outerJoinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2]
- // ------------------------------------------------------------------------------------------------
-
- test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", OuterJoinTest) {
- verifyPairRDDOuterJoinRegionWithSameKeyType("rr_str_int_region")
- }
-
- test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", OuterJoinTest) {
- verifyPairRDDOuterJoinRegionWithSameKeyType("pr_str_int_region")
- }
-
- test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", OuterJoinTest) {
- verifyPairRDDOuterJoinRegionWithSameKeyType("pr_r_str_int_region")
- }
-
- def verifyPairRDDOuterJoinRegionWithSameKeyType(regionPath: String): Unit = {
- val entriesMap: JHashMap[String, Int] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
- val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
- val conn = connConf.getConnection
- val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
- rgn.removeAll(rgn.keySetOnServer())
- rgn.putAll(entriesMap)
-
- val data = (-5 until 50).map(x => ("k_" + x, x*2))
- val rdd = sc.parallelize(data)
-
- val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath /*, connConf*/)
- val rdd2Content = rdd2.collect()
-
- val expectedMap = (-5 until 50).map {
- i => if (i < 0) ((s"k_$i", i * 2), None)
- else ((s"k_$i", i*2), Some(i))}.toMap
- // matchMaps[(String, Int), Option[Int]](expectedMap, rdd2Content.toMap)
- assert(expectedMap == rdd2Content.toMap)
- }
-
- // ------------------------------------------------------------------------------------------------------
- // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2]
- // -------------------------------------------------------------------------------------------------------
-
- test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", OuterJoinTest) {
- verifyPairRDDOuterJoinRegionWithDiffKeyType("rr_str_int_region")
- }
-
- test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", OuterJoinTest) {
- verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_str_int_region")
- }
-
- test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", OuterJoinTest) {
- verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_r_str_int_region")
- }
-
- def verifyPairRDDOuterJoinRegionWithDiffKeyType(regionPath: String): Unit = {
- val entriesMap: JHashMap[String, Int] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
- val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
- val conn = connConf.getConnection
- val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
- rgn.removeAll(rgn.keySetOnServer())
- rgn.putAll(entriesMap)
-
- val data = (-5 until 50).map(x => (x, x*2))
- val rdd = sc.parallelize(data)
-
- val func :((Int, Int)) => String = pair => s"k_${pair._1}"
-
- val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, func, connConf)
- val rdd2Content = rdd2.collect()
-
- val expectedMap = (-5 until 50).map {
- i => if (i < 0) ((i, i * 2), None)
- else ((i, i*2), Some(i))}.toMap
- // matchMaps[(Int, Int), Option[Int]](expectedMap, rdd2Content.toMap)
- assert(expectedMap == rdd2Content.toMap)
- }
-
- // --------------------------------------------------------------------------------------------
- // RDD.joinGeodeRegion[K, V](regionPath, T => K, connConf): GeodeJoinRDD[T, K, V]
- // --------------------------------------------------------------------------------------------
-
- test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], replicated region", JoinTest) {
- verifyRDDJoinRegion("rr_str_int_region")
- }
-
- test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned region", JoinTest) {
- verifyRDDJoinRegion("pr_str_int_region")
- }
-
- test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", JoinTest) {
- verifyRDDJoinRegion("pr_r_str_int_region")
- }
-
- def verifyRDDJoinRegion(regionPath: String): Unit = {
- val entriesMap: JHashMap[String, Int] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
- val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
- val conn = connConf.getConnection
- val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
- rgn.removeAll(rgn.keySetOnServer())
- rgn.putAll(entriesMap)
-
- val data = (-5 until 50).map(x => s"k_$x")
- val rdd = sc.parallelize(data)
-
- val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, x => x, connConf)
- val rdd2Content = rdd2.collect()
-
- val expectedMap = (0 until 50).map(i => (s"k_$i", i)).toMap
- // matchMaps[String, Int](expectedMap, rdd2Content.toMap)
- assert(expectedMap == rdd2Content.toMap)
- }
-
- // --------------------------------------------------------------------------------------------
- // RDD.outerJoinGeodeRegion[K, V](regionPath, T => K, connConf): GeodeJoinRDD[T, K, V]
- // --------------------------------------------------------------------------------------------
-
- test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], replicated region", OnlyTest) {
- verifyRDDOuterJoinRegion("rr_str_int_region")
- }
-
- test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned region", OnlyTest) {
- verifyRDDOuterJoinRegion("pr_str_int_region")
- }
-
- test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", OnlyTest) {
- verifyRDDOuterJoinRegion("pr_r_str_int_region")
- }
-
- def verifyRDDOuterJoinRegion(regionPath: String): Unit = {
- val entriesMap: JHashMap[String, Int] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
- val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
- val conn = connConf.getConnection
- val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
- rgn.removeAll(rgn.keySetOnServer())
- rgn.putAll(entriesMap)
-
- val data = (-5 until 50).map(x => s"k_$x")
- val rdd = sc.parallelize(data)
-
- val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, x => x /*, connConf */)
- val rdd2Content = rdd2.collect()
-
- val expectedMap = (-5 until 50).map {
- i => if (i < 0) (s"k_$i", None)
- else (s"k_$i", Some(i))}.toMap
- // matchMaps[String, Option[Int]](expectedMap, rdd2Content.toMap)
- assert(expectedMap == rdd2Content.toMap)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala
deleted file mode 100644
index 1ad843e..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/RetrieveRegionIntegrationTest.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector
-
-import java.util.Properties
-
-import io.pivotal.geode.spark.connector._
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager
-import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster
-import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils
-import org.apache.spark.{SparkContext, SparkConf}
-import org.scalatest.{Tag, BeforeAndAfterAll, FunSuite, Matchers}
-import java.util.{HashMap => JHashMap}
-
-
-class RetrieveRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
-
- var sc: SparkContext = null
- val numServers = 4
- val numObjects = 1000
-
- override def beforeAll() {
- // start geode cluster, and spark context
- val settings = new Properties()
- settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
- settings.setProperty("num-of-servers", numServers.toString)
- val locatorPort = GeodeCluster.start(settings)
-
- // start spark context in local mode
- IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
- "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG")
- val conf = new SparkConf()
- .setAppName("RetrieveRegionIntegrationTest")
- .setMaster("local[2]")
- .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
- sc = new SparkContext(conf)
- }
-
- override def afterAll() {
- // stop connection, spark context, and geode cluster
- DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
- sc.stop()
- GeodeCluster.stop()
- }
-
- def executeTest[K,V](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,V]) = {
- //Populate some data in the region
- val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
- val conn = connConf.getConnection
- val rgn: Region[K, V] = conn.getRegionProxy(regionName)
- rgn.removeAll(rgn.keySetOnServer())
- rgn.putAll(entriesMap)
- verifyRetrieveRegion[K,V](regionName, entriesMap)
- }
-
- def verifyRetrieveRegion[K,V](regionName:String, entriesMap:java.util.Map[K,V]) = {
- val rdd = sc.geodeRegion(regionName)
- val collectedObjs = rdd.collect()
- collectedObjs should have length entriesMap.size
- import scala.collection.JavaConverters._
- matchMaps[K,V](entriesMap.asScala.toMap, collectedObjs.toMap)
- }
-
- def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
- assert(map1.size == map2.size)
- map1.foreach(e => {
- assert(map2.contains(e._1))
- assert (e._2 == map2.get(e._1).get)
- }
- )
- }
-
- //Retrieve region for Partitioned Region where some nodes are empty (empty iterator)
- //This test has to run first...the rest of the tests always use the same num objects
- test("Retrieve Region for PR where some nodes are empty (Empty Iterator)") {
- val numObjects = numServers - 1
- val entriesMap:JHashMap[String, Int] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
- executeTest[String, Int]("rr_str_int_region", numObjects, entriesMap)
- }
-
- //Test for retrieving from region containing string key and int value
- def verifyRetrieveStringStringRegion(regionName:String) = {
- val entriesMap:JHashMap[String, String] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("key_" + i, "value_" + i))
- executeTest[String, String](regionName, numObjects, entriesMap)
- }
-
- test("Retrieve Region with replicate redundant string string") {
- verifyRetrieveStringStringRegion("rr_obj_obj_region")
- }
-
- test("Retrieve Region with partitioned string string") {
- verifyRetrieveStringStringRegion("pr_obj_obj_region")
- }
-
- test("Retrieve Region with partitioned redundant string string") {
- verifyRetrieveStringStringRegion("pr_r_obj_obj_region")
- }
-
-
- //Test for retrieving from region containing string key and string value
- def verifyRetrieveStringIntRegion(regionName:String) = {
- val entriesMap:JHashMap[String, Int] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
- executeTest[String, Int](regionName, numObjects, entriesMap)
- }
-
- test("Retrieve Region with replicate string int region") {
- verifyRetrieveStringIntRegion("rr_str_int_region")
- }
-
- test("Retrieve Region with partitioned string int region") {
- verifyRetrieveStringIntRegion("pr_str_int_region")
- }
-
- test("Retrieve Region with partitioned redundant string int region") {
- verifyRetrieveStringIntRegion("pr_r_str_int_region")
- }
-
- //Tests for retrieving from region containing string key and object value
- def verifyRetrieveStringObjectRegion(regionName:String) = {
- val entriesMap:JHashMap[String, Object] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("key_" + i, new Employee("ename" + i, i)))
- executeTest[String, Object](regionName, numObjects, entriesMap)
- }
-
- test("Retrieve Region with replicate string obj") {
- verifyRetrieveStringObjectRegion("rr_obj_obj_region")
- }
-
- test("Retrieve Region with partitioned string obj") {
- verifyRetrieveStringObjectRegion("pr_obj_obj_region")
- }
-
- test("Retrieve Region with partitioned redundant string obj") {
- verifyRetrieveStringObjectRegion("pr_r_obj_obj_region")
- }
-
- //Test for retrieving from region containing string key and map value
- def verifyRetrieveStringMapRegion(regionName:String) = {
- val entriesMap:JHashMap[String,JHashMap[String,String]] = new JHashMap()
- (0 until numObjects).map(i => {
- val hashMap:JHashMap[String, String] = new JHashMap()
- hashMap.put("mapKey:" + i, "mapValue:" + i)
- entriesMap.put("key_" + i, hashMap)
- })
- executeTest(regionName, numObjects, entriesMap)
- }
-
- test("Retrieve Region with replicate string map region") {
- verifyRetrieveStringMapRegion("rr_obj_obj_region")
- }
-
- test("Retrieve Region with partitioned string map region") {
- verifyRetrieveStringMapRegion("pr_obj_obj_region")
- }
-
- test("Retrieve Region with partitioned redundant string map region") {
- verifyRetrieveStringMapRegion("pr_r_obj_obj_region")
- }
-
- //Test and helpers specific for retrieving from region containing string key and byte[] value
- def executeTestWithByteArrayValues[K](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,Array[Byte]]) = {
- //Populate some data in the region
- val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
- val conn = connConf.getConnection
- val rgn: Region[K, Array[Byte]] = conn.getRegionProxy(regionName)
- rgn.putAll(entriesMap)
- verifyRetrieveRegionWithByteArrayValues[K](regionName, entriesMap)
- }
-
- def verifyRetrieveRegionWithByteArrayValues[K](regionName:String, entriesMap:java.util.Map[K,Array[Byte]]) = {
- val rdd = sc.geodeRegion(regionName)
- val collectedObjs = rdd.collect()
- collectedObjs should have length entriesMap.size
- import scala.collection.JavaConverters._
- matchByteArrayMaps[K](entriesMap.asScala.toMap, collectedObjs.toMap)
- }
-
- def matchByteArrayMaps[K](map1:Map[K,Array[Byte]], map2:Map[K,Array[Byte]]) = {
- map1.foreach(e => {
- assert(map2.contains(e._1))
- assert (java.util.Arrays.equals(e._2, map2.get(e._1).get))
- }
- )
- assert(map1.size == map2.size)
-
- }
-
- def verifyRetrieveStringByteArrayRegion(regionName:String) = {
- val entriesMap:JHashMap[String, Array[Byte]] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("key_" + i, Array[Byte](192.toByte, 168.toByte, 0, i.toByte)))
- executeTestWithByteArrayValues[String](regionName, numObjects, entriesMap)
- }
-
- test("Retrieve Region with replicate region string byte[] region") {
- verifyRetrieveStringByteArrayRegion("rr_obj_obj_region")
- }
-
- test("Retrieve Region with partition region string byte[] region") {
- verifyRetrieveStringByteArrayRegion("pr_obj_obj_region")
- }
-
- test("Retrieve Region with partition redundant region string byte[] region") {
- verifyRetrieveStringByteArrayRegion("pr_r_obj_obj_region")
- }
-
- test("Retrieve Region with where clause on partitioned redundant region", FilterTest) {
- verifyRetrieveRegionWithWhereClause("pr_r_str_int_region")
- }
-
- test("Retrieve Region with where clause on partitioned region", FilterTest) {
- verifyRetrieveRegionWithWhereClause("pr_str_int_region")
- }
-
- test("Retrieve Region with where clause on replicated region", FilterTest) {
- verifyRetrieveRegionWithWhereClause("rr_str_int_region")
- }
-
- def verifyRetrieveRegionWithWhereClause(regionPath: String): Unit = {
- val entriesMap: JHashMap[String, Int] = new JHashMap()
- (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
-
- val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
- val conn = connConf.getConnection
- val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
- rgn.removeAll(rgn.keySetOnServer())
- rgn.putAll(entriesMap)
-
- val rdd = sc.geodeRegion(regionPath).where("value.intValue() < 50")
- val expectedMap = (0 until 50).map(i => (s"key_$i", i)).toMap
- val collectedObjs = rdd.collect()
- // collectedObjs should have length expectedMap.size
- matchMaps[String, Int](expectedMap, collectedObjs.toMap)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala
deleted file mode 100644
index b8571d8..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark
-
-import org.scalatest.Tag
-
-package object connector {
-
- object OnlyTest extends Tag("OnlyTest")
- object FetchDataTest extends Tag("FetchDateTest")
- object FilterTest extends Tag("FilterTest")
- object JoinTest extends Tag("JoinTest")
- object OuterJoinTest extends Tag("OuterJoinTest")
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala
deleted file mode 100644
index 18b2fd7..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeCluster.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector.testkit
-
-import java.util.Properties
-
-trait GeodeCluster {
- def startGeodeCluster(settings: Properties): Int = {
- println("=== GeodeCluster start()")
- GeodeCluster.start(settings)
- }
-}
-
-object GeodeCluster {
- private var geode: Option[GeodeRunner] = None
-
- def start(settings: Properties): Int = {
- geode.map(_.stopGeodeCluster()) // Clean up any old running Geode instances
- val runner = new GeodeRunner(settings)
- geode = Some(runner)
- runner.getLocatorPort
- }
-
- def stop(): Unit = {
- println("=== GeodeCluster shutdown: " + geode.toString)
- geode match {
- case None => println("Nothing to shutdown.")
- case Some(runner) => runner.stopGeodeCluster()
- }
- geode = None
- println("=== GeodeCluster shutdown finished.")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala
deleted file mode 100644
index 725a012..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/GeodeRunner.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector.testkit
-
-import java.io.{IOException, File}
-import java.net.InetAddress
-import java.util.Properties
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.io.FileUtils
-import org.apache.commons.io.filefilter.IOFileFilter
-
-/**
-* A class that manages Geode locator and servers. Uses gfsh to
-* start and stop the locator and servers.
-*/
-class GeodeRunner(settings: Properties) {
- val gfshCmd = new File(getCurrentDirectory, "../../geode-assembly/build/install/apache-geode/bin/gfsh").toString
- val cacheXMLFile = settings.get("cache-xml-file")
- val numServers: Int = settings.get("num-of-servers").asInstanceOf[String].toInt
- val cwd = new File(".").getAbsolutePath
- val geodeFunctionsTargetDir = new File("../geode-functions/target")
- val testroot = "target/testgeode"
- val classpath = new File(cwd, "target/scala-2.10/it-classes/")
- val locatorPort = startGeodeCluster(numServers)
-
- def getLocatorPort: Int = locatorPort
-
- private def getCurrentDirectory = new File( "." ).getCanonicalPath
-
- private def startGeodeCluster(numServers: Int): Int = {
- //ports(0) for Geode locator, the other ports are for Geode servers
- val ports: Seq[Int] = IOUtils.getRandomAvailableTCPPorts(2 + numServers)
- startGeodeLocator(ports(0), ports(1))
- startGeodeServers(ports(0), ports.drop(2))
- registerFunctions(ports(1))
- ports(0)
- }
-
- private def startGeodeLocator(locatorPort: Int, jmxHttpPort:Int) {
- println(s"=== GeodeRunner: starting locator on port $locatorPort")
- val locatorDir = new File(cwd, s"$testroot/locator")
- if (locatorDir.exists())
- FileUtils.deleteDirectory(locatorDir)
- IOUtils.mkdir(locatorDir)
- new ProcessBuilder()
- .command(gfshCmd, "start", "locator",
- "--name=locator",
- s"--dir=$locatorDir",
- s"--port=$locatorPort",
- s"--J=-Dgemfire.jmx-manager-http-port=$jmxHttpPort")
- .inheritIO()
- .start()
-
- // Wait 30 seconds for locator to start
- println(s"=== GeodeRunner: waiting for locator on port $locatorPort")
- if (!IOUtils.waitForPortOpen(InetAddress.getByName("localhost"), locatorPort, 30000))
- throw new IOException("Failed to start Geode locator.")
- println(s"=== GeodeRunner: done waiting for locator on port $locatorPort")
- }
-
- private def startGeodeServers(locatorPort: Int, serverPorts: Seq[Int]) {
- val procs = for (i <- 0 until serverPorts.length) yield {
- println(s"=== GeodeRunner: starting server${i+1} with clientPort ${serverPorts(i)}")
- val serverDir = new File(cwd, s"$testroot/server${i+1}")
- if (serverDir.exists())
- FileUtils.deleteDirectory(serverDir)
- IOUtils.mkdir(serverDir)
- new ProcessBuilder()
- .command(gfshCmd, "start", "server",
- s"--name=server${i+1}",
- s"--locators=localhost[$locatorPort]",
- s"--bind-address=localhost",
- s"--server-port=${serverPorts(i)}",
- s"--dir=$serverDir",
- s"--cache-xml-file=$cacheXMLFile",
- s"--classpath=$classpath")
- .inheritIO()
- .start()
- }
- procs.foreach(p => p.waitFor)
- println(s"All $serverPorts.length servers have been started")
- }
-
- private def registerFunctions(jmxHttpPort:Int) {
- import scala.collection.JavaConversions._
- FileUtils.listFiles(geodeFunctionsTargetDir, fileFilter, dirFilter).foreach{ f => registerFunction(jmxHttpPort, f)}
- }
-
- def fileFilter = new IOFileFilter {
- def accept (file: File) = file.getName.endsWith(".jar") && file.getName.startsWith("geode-functions")
- def accept (dir: File, name: String) = name.endsWith(".jar") && name.startsWith("geode-functions")
- }
-
- def dirFilter = new IOFileFilter {
- def accept (file: File) = file.getName.startsWith("scala")
- def accept (dir: File, name: String) = name.startsWith("scala")
- }
-
- private def registerFunction(jmxHttpPort:Int, jar:File) {
- println("Deploying:" + jar.getName)
- import io.pivotal.geode.spark.connector.GeodeFunctionDeployer
- val deployer = new GeodeFunctionDeployer(new HttpClient())
- deployer.deploy("localhost", jmxHttpPort, jar)
- }
-
- def stopGeodeCluster(): Unit = {
- stopGeodeServers(numServers)
- stopGeodeLocator()
- if (!IOUtils.waitForPortClose(InetAddress.getByName("localhost"), getLocatorPort, 30000))
- throw new IOException(s"Failed to stop Geode locator at port $getLocatorPort.")
- println(s"Successfully stop Geode locator at port $getLocatorPort.")
- }
-
- private def stopGeodeLocator() {
- println(s"=== GeodeRunner: stop locator")
- val p = new ProcessBuilder()
- .inheritIO()
- .command(gfshCmd, "stop", "locator", s"--dir=$testroot/locator")
- .start()
- p.waitFor()
- }
-
- private def stopGeodeServers(numServers: Int) {
- val procs = for (i <-1 to numServers) yield {
- println(s"=== GeodeRunner: stop server $i.")
- new ProcessBuilder()
- .inheritIO()
- .command(gfshCmd, "stop", "server", s"--dir=$testroot/server$i")
- .start()
- }
- procs.foreach(p => p.waitFor())
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala
deleted file mode 100644
index 21a9232..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/testkit/IOUtils.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector.testkit
-
-import java.io.{File, IOException}
-import java.net.{InetAddress, Socket}
-import com.gemstone.gemfire.internal.AvailablePort
-import scala.util.Try
-import org.apache.log4j.PropertyConfigurator
-import java.util.Properties
-
-object IOUtils {
-
- /** Makes a new directory or throws an `IOException` if it cannot be made */
- def mkdir(dir: File): File = {
- if (!dir.mkdirs())
- throw new IOException(s"Could not create dir $dir")
- dir
- }
-
- private def socketPortProb(host: InetAddress, port: Int) = Iterator.continually {
- Try {
- Thread.sleep(100)
- new Socket(host, port).close()
- }
- }
-
- /**
- * Waits until a port at the given address is open or timeout passes.
- * @return true if managed to connect to the port, false if timeout happened first
- */
- def waitForPortOpen(host: InetAddress, port: Int, timeout: Long): Boolean = {
- val startTime = System.currentTimeMillis()
- socketPortProb(host, port)
- .dropWhile(p => p.isFailure && System.currentTimeMillis() - startTime < timeout)
- .next()
- .isSuccess
- }
-
- /**
- * Waits until a port at the given address is close or timeout passes.
- * @return true if host:port is un-connect-able, false if timeout happened first
- */
- def waitForPortClose(host: InetAddress, port: Int, timeout: Long): Boolean = {
- val startTime = System.currentTimeMillis()
- socketPortProb(host, port)
- .dropWhile(p => p.isSuccess && System.currentTimeMillis() - startTime < timeout)
- .next()
- .isFailure
- }
-
- /**
- * Returns array of unique randomly available tcp ports of specified count.
- */
- def getRandomAvailableTCPPorts(count: Int): Seq[Int] =
- (0 until count).map(x => AvailablePort.getRandomAvailablePortKeeper(AvailablePort.SOCKET))
- .map{x => x.release(); x.getPort}.toArray
-
- /**
- * config a log4j properties used for integration tests
- */
- def configTestLog4j(level: String, props: (String, String)*): Unit = {
- val pro = new Properties()
- props.foreach(p => pro.put(p._1, p._2))
- configTestLog4j(level, pro)
- }
-
- def configTestLog4j(level: String, props: Properties): Unit = {
- val pro = new Properties()
- pro.put("log4j.rootLogger", s"$level, console")
- pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
- pro.put("log4j.appender.console.target", "System.err")
- pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
- pro.put("log4j.appender.console.layout.ConversionPattern",
- "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
- pro.putAll(props)
- PropertyConfigurator.configure(pro)
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
deleted file mode 100644
index 67f9e57..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming
-
-import org.apache.spark.util.ManualClock
-
-object ManualClockHelper {
-
- def addToTime(ssc: StreamingContext, timeToAdd: Long): Unit = {
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- clock.advance(timeToAdd)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
deleted file mode 100644
index fce1e67..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.reflect.ClassTag
-
-class TestInputDStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
- extends InputDStream[T](ssc_) {
-
- def start() {}
-
- def stop() {}
-
- def compute(validTime: Time): Option[RDD[T]] = {
- logInfo("Computing RDD for time " + validTime)
- val index = ((validTime - zeroTime) / slideDuration - 1).toInt
- val selectedInput = if (index < input.size) input(index) else Seq[T]()
-
- // lets us test cases where RDDs are not created
- if (selectedInput == null)
- return None
-
- val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
- logInfo("Created RDD " + rdd.id + " with " + selectedInput)
- Some(rdd)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
deleted file mode 100644
index e7c7cf9..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.streaming.GeodeDStreamFunctions;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import java.util.Properties;
-
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream}
- * to provide Geode Spark Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaDStreamFunctions<T> {
-
- public final GeodeDStreamFunctions<T> dsf;
-
- public GeodeJavaDStreamFunctions(JavaDStream<T> ds) {
- this.dsf = new GeodeDStreamFunctions<T>(ds.dstream());
- }
-
- /**
- * Save the JavaDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the optional parameters for this operation
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) {
- dsf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- * @param opConf the optional parameters for this operation
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, Properties opConf) {
- dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) {
- dsf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the JavaDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func) {
- dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
deleted file mode 100644
index 2c83255..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.streaming.GeodePairDStreamFunctions;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import java.util.Properties;
-
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream}
- * to provide Geode Spark Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaPairDStreamFunctions<K, V> {
-
- public final GeodePairDStreamFunctions<K, V> dsf;
-
- public GeodeJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) {
- this.dsf = new GeodePairDStreamFunctions<K, V>(ds.dstream());
- }
-
- /**
- * Save the JavaPairDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the optional parameters for this operation
- */
- public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) {
- dsf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaPairDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- */
- public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
- dsf.saveToGeode(regionPath, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the JavaPairDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param opConf the optional parameters for this operation
- */
- public void saveToGeode(String regionPath, Properties opConf) {
- dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaPairDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- */
- public void saveToGeode(String regionPath) {
- dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
deleted file mode 100644
index 3278a5b..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.GeodePairRDDFunctions;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import scala.Option;
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-
-import java.util.Properties;
-
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide Geode Spark
- * Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaPairRDDFunctions<K, V> {
-
- public final GeodePairRDDFunctions<K, V> rddf;
-
- public GeodeJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) {
- this.rddf = new GeodePairRDDFunctions<K, V>(rdd.rdd());
- }
-
- /**
- * Save the pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the parameters for this operation
- */
- public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) {
- rddf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param opConf the parameters for this operation
- */
- public void saveToGeode(String regionPath, Properties opConf) {
- rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- */
- public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
- rddf.saveToGeode(regionPath, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the pair RDD to Geode key-value store with the default GeodeConnector.
- * @param regionPath the full path of region that the RDD is stored
- */
- public void saveToGeode(String regionPath) {
- rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap());
- }
-
- /**
- * Return an JavaPairRDD containing all pairs of elements with matching keys in
- * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements
- * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
- * (k, v2) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<<K, V>, V2>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(String regionPath) {
- return joinGeodeRegion(regionPath, rddf.defaultConnectionConf());
- }
-
- /**
- * Return an JavaPairRDD containing all pairs of elements with matching keys in
- * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements
- * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
- * (k, v2) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<<K, V>, V2>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
- String regionPath, GeodeConnectionConf connConf) {
- GeodeJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGeodeRegion(regionPath, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<V2> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD
- * element is generated by `func(K, V) => K2`, and the key from the Geode
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
- * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param <K2> the key type of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, V2>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func) {
- return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD
- * element is generated by `func(K, V) => K2`, and the key from the Geode
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
- * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <K2> the key type of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, V2>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) {
- GeodeJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGeodeRegion(regionPath, func, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<V2> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`.
- * For each element (k, v) in this RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key k.
- *
- * @param regionPath the region path of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(String regionPath) {
- return outerJoinGeodeRegion(regionPath, rddf.defaultConnectionConf());
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`.
- * For each element (k, v) in this RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key k.
- *
- * @param regionPath the region path of the Geode region
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
- String regionPath, GeodeConnectionConf connConf) {
- GeodeOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<Option<V2>> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`.
- * The join key from RDD element is generated by `func(K, V) => K2`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param <K2> the key type of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func) {
- return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`.
- * The join key from RDD element is generated by `func(K, V) => K2`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <K2> the key type of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) {
- GeodeOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<Option<V2>> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
deleted file mode 100644
index e4f6f36..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.GeodeRDDFunctions;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Option;
-import scala.reflect.ClassTag;
-
-import java.util.Properties;
-
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide Geode Spark
- * Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaRDDFunctions<T> {
-
- public final GeodeRDDFunctions<T> rddf;
-
- public GeodeJavaRDDFunctions(JavaRDD<T> rdd) {
- this.rddf = new GeodeRDDFunctions<T>(rdd.rdd());
- }
-
- /**
- * Save the non-pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the parameters for this operation
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) {
- rddf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the non-pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) {
- rddf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the non-pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- * @param opConf the parameters for this operation
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, Properties opConf) {
- rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the non-pair RDD to Geode key-value store with default GeodeConnector.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- */
- public <K, V> void saveToGeode(String regionPath, PairFunction<T, K, V> func) {
- rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap());
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<T> and the Geode `Region<K, V>`. The join key from RDD
- * element is generated by `func(T) => K`, and the key from the Geode
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
- * where t is from this RDD and v is from the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element T
- * @param <K> the key type of the Geode region
- * @param <V> the value type of the Geode region
- * @return JavaPairRDD<T, V>
- */
- public <K, V> JavaPairRDD<T, V> joinGeodeRegion(String regionPath, Function<T, K> func) {
- return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<T> and the Geode `Region<K, V>`. The join key from RDD
- * element is generated by `func(T) => K`, and the key from the Geode
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
- * where t is from this RDD and v is from the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element T
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <K> the key type of the Geode region
- * @param <V> the value type of the Geode region
- * @return JavaPairRDD<T, V>
- */
- public <K, V> JavaPairRDD<T, V> joinGeodeRegion(
- String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
- GeodeJoinRDD<T, K, V> rdd = rddf.joinGeodeRegion(regionPath, func, connConf);
- ClassTag<T> kt = fakeClassTag();
- ClassTag<V> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`.
- * The join key from RDD element is generated by `func(T) => K`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (t) in this RDD, the resulting RDD will either contain
- * all pairs (t, Some(v)) for v in the Geode region, or the pair
- * (t, None) if no element in the Geode region have key `func(t)`.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element T
- * @param <K> the key type of the Geode region
- * @param <V> the value type of the Geode region
- * @return JavaPairRDD<T, Option<V>>
- */
- public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(String regionPath, Function<T, K> func) {
- return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`.
- * The join key from RDD element is generated by `func(T) => K`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (t) in this RDD, the resulting RDD will either contain
- * all pairs (t, Some(v)) for v in the Geode region, or the pair
- * (t, None) if no element in the Geode region have key `func(t)`.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element T
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <K> the key type of the Geode region
- * @param <V> the value type of the Geode region
- * @return JavaPairRDD<T, Option<V>>
- */
- public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(
- String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
- GeodeOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf);
- ClassTag<T> kt = fakeClassTag();
- ClassTag<Option<V>> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
deleted file mode 100644
index 3471bf90..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.GeodeSQLContextFunctions;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-
-/**
- * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide Geode
- * OQL functionality.
- *
- * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaSQLContextFunctions {
-
- public final GeodeSQLContextFunctions scf;
-
- public GeodeJavaSQLContextFunctions(SQLContext sqlContext) {
- scf = new GeodeSQLContextFunctions(sqlContext);
- }
-
- public <T> DataFrame geodeOQL(String query) {
- DataFrame df = scf.geodeOQL(query, scf.defaultConnectionConf());
- return df;
- }
-
- public <T> DataFrame geodeOQL(String query, GeodeConnectionConf connConf) {
- DataFrame df = scf.geodeOQL(query, connConf);
- return df;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
deleted file mode 100644
index ce6b1ff..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD$;
-import org.apache.spark.SparkContext;
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-import scala.reflect.ClassTag;
-import java.util.Properties;
-
-/**
- * Java API wrapper over {@link org.apache.spark.SparkContext} to provide Geode
- * Connector functionality.
- *
- * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaSparkContextFunctions {
-
- public final SparkContext sc;
-
- public GeodeJavaSparkContextFunctions(SparkContext sc) {
- this.sc = sc;
- }
-
- /**
- * Expose a Geode region as a JavaPairRDD
- * @param regionPath the full path of the region
- * @param connConf the GeodeConnectionConf that can be used to access the region
- * @param opConf the parameters for this operation, such as preferred partitioner.
- */
- public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(
- String regionPath, GeodeConnectionConf connConf, Properties opConf) {
- ClassTag<K> kt = fakeClassTag();
- ClassTag<V> vt = fakeClassTag();
- GeodeRegionRDD<K, V> rdd = GeodeRegionRDD$.MODULE$.apply(
- sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt);
- return new GeodeJavaRegionRDD<>(rdd);
- }
-
- /**
- * Expose a Geode region as a JavaPairRDD with default GeodeConnector and no preferred partitioner.
- * @param regionPath the full path of the region
- */
- public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath) {
- GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
- return geodeRegion(regionPath, connConf, new Properties());
- }
-
- /**
- * Expose a Geode region as a JavaPairRDD with no preferred partitioner.
- * @param regionPath the full path of the region
- * @param connConf the GeodeConnectionConf that can be used to access the region
- */
- public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, GeodeConnectionConf connConf) {
- return geodeRegion(regionPath, connConf, new Properties());
- }
-
- /**
- * Expose a Geode region as a JavaPairRDD with default GeodeConnector.
- * @param regionPath the full path of the region
- * @param opConf the parameters for this operation, such as preferred partitioner.
- */
- public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, Properties opConf) {
- GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
- return geodeRegion(regionPath, connConf, opConf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
deleted file mode 100644
index 41fe7e5..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import scala.Tuple2;
-
-import io.pivotal.geode.spark.connector.package$;
-
-/**
- * The main entry point to Spark Geode Connector Java API.
- *
- * There are several helpful static factory methods which build useful wrappers
- * around Spark Context, Streaming Context and RDD. There are also helper methods
- * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>.
- */
-public final class GeodeJavaUtil {
-
- /** constants */
- public static String GeodeLocatorPropKey = package$.MODULE$.GeodeLocatorPropKey();
- // partitioner related keys and values
- public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey();
- public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey();
- public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName();
- public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName();
- public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey();
- public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault();
-
- /** The private constructor is used prevents user from creating instance of this class. */
- private GeodeJavaUtil() { }
-
- /**
- * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based
- * on an existing {@link SparkContext} instance.
- */
- public static GeodeJavaSparkContextFunctions javaFunctions(SparkContext sc) {
- return new GeodeJavaSparkContextFunctions(sc);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based
- * on an existing {@link JavaSparkContext} instance.
- */
- public static GeodeJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) {
- return new GeodeJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc));
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaPairRDDFunctions} based on an
- * existing {@link org.apache.spark.api.java.JavaPairRDD} instance.
- */
- public static <K, V> GeodeJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) {
- return new GeodeJavaPairRDDFunctions<K, V>(rdd);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaRDDFunctions} based on an
- * existing {@link org.apache.spark.api.java.JavaRDD} instance.
- */
- public static <T> GeodeJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) {
- return new GeodeJavaRDDFunctions<T>(rdd);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaPairDStreamFunctions} based on an
- * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance.
- */
- public static <K, V> GeodeJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) {
- return new GeodeJavaPairDStreamFunctions<>(ds);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaDStreamFunctions} based on an
- * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance.
- */
- public static <T> GeodeJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) {
- return new GeodeJavaDStreamFunctions<>(ds);
- }
-
- /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}<<Tuple2<K, V>>
- * to a {@link org.apache.spark.api.java.JavaPairRDD}<K, V>.
- */
- public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) {
- return JavaAPIHelper.toJavaPairRDD(rdd);
- }
-
- /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}<<Tuple2<K, V>>
- * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}<K, V>.
- */
- public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) {
- return JavaAPIHelper.toJavaPairDStream(ds);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaSQLContextFunctions} based
- * on an existing {@link SQLContext} instance.
- */
- public static GeodeJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) {
- return new GeodeJavaSQLContextFunctions(sqlContext);
- }
-
-}