You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/21 19:16:48 UTC
[13/50] [abbrv] incubator-geode git commit: GEODE-1244: Package,
directory, project and file rename for geode-spark-connector
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala
deleted file mode 100644
index 48f83c9..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.gemfire.spark.connector.rdd
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
-import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDPartition, GemFireRegionRDD}
-import io.pivotal.gemfire.spark.connector.{GemFireConnectionConf, GemFireConnection}
-import org.apache.spark.{TaskContext, Partition, SparkContext}
-import org.mockito.Mockito._
-import org.mockito.Matchers.{eq => mockEq, any => mockAny}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-
-import scala.reflect.ClassTag
-
-class GemFireRegionRDDTest extends FunSuite with Matchers with MockitoSugar {
-
- /** create common mocks, not all mocks are used by all tests */
- def createMocks[K, V](regionPath: String)(implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]])
- : (String, Region[K,V], GemFireConnectionConf, GemFireConnection) = {
- val mockConnection = mock[GemFireConnection]
- val mockRegion = mock[Region[K, V]]
- val mockConnConf = mock[GemFireConnectionConf]
- when(mockConnConf.getConnection).thenReturn(mockConnection)
- when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion)
- when(mockConnConf.locators).thenReturn(Seq.empty)
- (regionPath, mockRegion, mockConnConf, mockConnection)
- }
-
- test("create GemFireRDD with non-existing region") {
- val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
- when(mockConnConf.getConnection).thenReturn(mockConnection)
- when(mockConnection.validateRegion[String,String](regionPath)).thenThrow(new RuntimeException)
- val mockSparkContext = mock[SparkContext]
- intercept[RuntimeException] { GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf) }
- verify(mockConnConf).getConnection
- verify(mockConnection).validateRegion[String, String](regionPath)
- }
-
- test("getPartitions with non-existing region") {
- // region exists when RDD is created, but get removed before getPartitions() is invoked
- val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
- when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(None)
- val mockSparkContext = mock[SparkContext]
- intercept[RuntimeException] { GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf).getPartitions }
- }
-
- test("getPartitions with replicated region and not preferred env") {
- val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
- implicit val mockConnConf2 = mockConnConf
- val mockSparkContext = mock[SparkContext]
- when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null)))
- val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions
- verifySinglePartition(partitions)
- }
-
- def verifySinglePartition(partitions: Array[Partition]): Unit = {
- assert(1 == partitions.size)
- assert(partitions(0).index === 0)
- assert(partitions(0).isInstanceOf[GemFireRDDPartition])
- assert(partitions(0).asInstanceOf[GemFireRDDPartition].bucketSet.isEmpty)
- }
-
- test("getPartitions with replicated region and preferred OnePartitionPartitioner") {
- // since it's replicated region, so OnePartitionPartitioner will be used, i.e., override preferred partitioner
- import io.pivotal.gemfire.spark.connector.{PreferredPartitionerPropKey, OnePartitionPartitionerName}
- val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
- when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null)))
- implicit val mockConnConf2 = mockConnConf
- val mockSparkContext = mock[SparkContext]
- val env = Map(PreferredPartitionerPropKey -> OnePartitionPartitionerName)
- val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf, env).partitions
- verifySinglePartition(partitions)
- }
-
- test("getPartitions with partitioned region and not preferred env") {
- val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
- implicit val mockConnConf2 = mockConnConf
- val mockSparkContext = mock[SparkContext]
- when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null)))
- val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions
- verifySinglePartition(partitions)
- }
-
- test("GemFireRDD.compute() method") {
- val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
- implicit val mockConnConf2 = mockConnConf
- val mockIter = mock[Iterator[(String, String)]]
- val partition = GemFireRDDPartition(0, Set.empty)
- when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null)))
- when(mockConnection.getRegionData[String, String](regionPath, None, partition)).thenReturn(mockIter)
- val mockSparkContext = mock[SparkContext]
- val rdd = GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf)
- val partitions = rdd.partitions
- assert(1 == partitions.size)
- val mockTaskContext = mock[TaskContext]
- rdd.compute(partitions(0), mockTaskContext)
- verify(mockConnection).getRegionData[String, String](mockEq(regionPath), mockEq(None), mockEq(partition))
- // verify(mockConnection).getRegionData[String, String](regionPath, Set.empty.asInstanceOf[Set[Int]], "gemfireRDD 0.0")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java
deleted file mode 100644
index 03e15a0..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import java.io.Serializable;
-
-/**
- * This is a demo class used in doc/?.md
- */
-public class Emp implements Serializable {
-
- private int id;
-
- private String lname;
-
- private String fname;
-
- private int age;
-
- private String loc;
-
- public Emp(int id, String lname, String fname, int age, String loc) {
- this.id = id;
- this.lname = lname;
- this.fname = fname;
- this.age = age;
- this.loc = loc;
- }
-
- public int getId() {
- return id;
- }
-
- public String getLname() {
- return lname;
- }
-
- public String getFname() {
- return fname;
- }
-
- public int getAge() {
- return age;
- }
-
- public String getLoc() {
- return loc;
- }
-
- @Override
- public String toString() {
- return "Emp(" + id + ", " + lname + ", " + fname + ", " + age + ", " + loc + ")";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Emp emp = (Emp) o;
-
- if (age != emp.age) return false;
- if (id != emp.id) return false;
- if (fname != null ? !fname.equals(emp.fname) : emp.fname != null) return false;
- if (lname != null ? !lname.equals(emp.lname) : emp.lname != null) return false;
- if (loc != null ? !loc.equals(emp.loc) : emp.loc != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = id;
- result = 31 * result + (lname != null ? lname.hashCode() : 0);
- result = 31 * result + (fname != null ? fname.hashCode() : 0);
- result = 31 * result + age;
- result = 31 * result + (loc != null ? loc.hashCode() : 0);
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java
deleted file mode 100644
index 41654a5..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
-
-
-/**
- * This Spark application demonstrates how to get region data from GemFire using GemFire
- * OQL Java API. The result is a Spark DataFrame.
- * <p>
- * In order to run it, you will need to start a GemFire cluster, and run demo PairRDDSaveJavaDemo
- * first to create some data in the region.
- * <p>
- * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
- * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/.
- * Then run the following command to start a Spark job:
- * <pre>
- * <path to spark>/bin/spark-submit --master=local[2] --class demo.OQLJavaDemo \
- * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
- * </pre>
- */
-public class OQLJavaDemo {
-
- public static void main(String[] argv) {
-
- if (argv.length != 1) {
- System.err.printf("Usage: OQLJavaDemo <locators>\n");
- return;
- }
-
- SparkConf conf = new SparkConf().setAppName("OQLJavaDemo");
- conf.set(GemFireLocatorPropKey, argv[0]); // "192.168.1.47[10335]"
- JavaSparkContext sc = new JavaSparkContext(conf);
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
- DataFrame df = javaFunctions(sqlContext).gemfireOQL("select * from /str_str_region");
- System.out.println("======= DataFrame =======\n");
- df.show();
- sc.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java
deleted file mode 100644
index 84f87af..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
-import java.util.*;
-
-import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
-
-/**
- * This Spark application demonstrates how to save a RDD to GemFire using GemFire Spark
- * Connector with Java.
- * <p/>
- * In order to run it, you will need to start GemFire cluster, and create the following region
- * with GFSH:
- * <pre>
- * gfsh> create region --name=str_str_region --type=REPLICATE \
- * --key-constraint=java.lang.String --value-constraint=java.lang.String
- * </pre>
- *
- * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
- * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/.
- * Then run the following command to start a Spark job:
- * <pre>
- * <path to spark>/bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo \
- * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
- * </pre>
- *
- * Verify the data was saved to GemFire with GFSH:
- * <pre>gfsh> query --query="select * from /str_str_region.entrySet" </pre>
- */
-public class PairRDDSaveJavaDemo {
-
- public static void main(String[] argv) {
-
- if (argv.length != 1) {
- System.err.printf("Usage: PairRDDSaveJavaDemo <locators>\n");
- return;
- }
-
- SparkConf conf = new SparkConf().setAppName("PairRDDSaveJavaDemo");
- conf.set(GemFireLocatorPropKey, argv[0]);
- JavaSparkContext sc = new JavaSparkContext(conf);
- GemFireConnectionConf connConf = GemFireConnectionConf.apply(conf);
-
- List<Tuple2<String, String>> data = new ArrayList<>();
- data.add(new Tuple2<>("7", "seven"));
- data.add(new Tuple2<>("8", "eight"));
- data.add(new Tuple2<>("9", "nine"));
-
- List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
- data2.add(new Tuple2<>("11", "eleven"));
- data2.add(new Tuple2<>("12", "twelve"));
- data2.add(new Tuple2<>("13", "thirteen"));
-
- // method 1: generate JavaPairRDD directly
- JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(data);
- javaFunctions(rdd1).saveToGemfire("str_str_region", connConf);
-
- // method 2: convert JavaRDD<Tuple2<K,V>> to JavaPairRDD<K, V>
- JavaRDD<Tuple2<String, String>> rdd2 = sc.parallelize(data2);
- javaFunctions(toJavaPairRDD(rdd2)).saveToGemfire("str_str_region", connConf);
-
- sc.stop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java
deleted file mode 100644
index 5fc5aeb..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
-
-/**
- * This Spark application demonstrates how to save a RDD to GemFire using GemFire Spark
- * Connector with Java.
- * <p/>
- * In order to run it, you will need to start GemFire cluster, and create the following region
- * with GFSH:
- * <pre>
- * gfsh> create region --name=str_int_region --type=REPLICATE \
- * --key-constraint=java.lang.String --value-constraint=java.lang.Integer
- * </pre>
- *
- * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
- * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/.
- * Then run the following command to start a Spark job:
- * <pre>
- * <path to spark>/bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo \
- * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
- * </pre>
- *
- * Verify the data was saved to GemFire with GFSH:
- * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre>
- */
-public class RDDSaveJavaDemo {
-
- public static void main(String[] argv) {
-
- if (argv.length != 1) {
- System.err.printf("Usage: RDDSaveJavaDemo <locators>\n");
- return;
- }
-
- SparkConf conf = new SparkConf().setAppName("RDDSaveJavaDemo");
- conf.set(GemFireLocatorPropKey, argv[0]);
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- List<String> data = new ArrayList<String>();
- data.add("abcdefg");
- data.add("abcdefgh");
- data.add("abcdefghi");
- JavaRDD<String> rdd = sc.parallelize(data);
-
- GemFireConnectionConf connConf = GemFireConnectionConf.apply(conf);
-
- PairFunction<String, String, Integer> func = new PairFunction<String, String, Integer>() {
- @Override public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<String, Integer>(s, s.length());
- }
- };
-
- javaFunctions(rdd).saveToGemfire("str_int_region", func, connConf);
-
- sc.stop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java
deleted file mode 100644
index 7c1d7bb..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
-
-/**
- * This Spark application demonstrates how to expose a region in GemFire as a RDD using GemFire
- * Spark Connector with Java.
- * <p>
- * In order to run it, you will need to start GemFire cluster, and run demo PairRDDSaveJavaDemo
- * first to create some data in the region.
- * <p>
- * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
- * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/.
- * Then run the following command to start a Spark job:
- * <pre>
- * <path to spark>/bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo \
- * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
- * </pre>
- */
-public class RegionToRDDJavaDemo {
-
- public static void main(String[] argv) {
-
- if (argv.length != 1) {
- System.err.printf("Usage: RegionToRDDJavaDemo <locators>\n");
- return;
- }
-
- SparkConf conf = new SparkConf().setAppName("RegionToRDDJavaDemo");
- conf.set(GemFireLocatorPropKey, argv[0]);
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- JavaPairRDD<String, String> rdd = javaFunctions(sc).gemfireRegion("str_str_region");
- System.out.println("=== gemfireRegion =======\n" + rdd.collect() + "\n=========================");
-
- sc.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala
deleted file mode 100644
index f67c32e..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import io.pivotal.gemfire.spark.connector.GemFireLocatorPropKey
-import io.pivotal.gemfire.spark.connector.streaming._
-
-/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * <p><p>
- * In order to run it, you will need to start GemFire cluster, and create the following region
- * with GFSH:
- * <pre>
- * gfsh> create region --name=str_int_region --type=REPLICATE \
- * --key-constraint=java.lang.String --value-constraint=java.lang.Integer
- * </pre>
- *
- * <p>To run this on your local machine, you need to first run a net cat server
- * `$ nc -lk 9999`
- * and then run the example
- * `$ bin/spark-submit --master=local[2] --class demo.NetworkWordCount <path to>/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port`
- *
- * <p><p> check result that was saved to GemFire with GFSH:
- * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre>
- */
-object NetworkWordCount {
-
- def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: NetworkWordCount <hostname> <port> <gemfire locator>")
- System.exit(1)
- }
-
- val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- val currentCount = values.foldLeft(0)(_ + _)
- val previousCount = state.getOrElse(0)
- Some(currentCount + previousCount)
- }
-
- // Create the context with a 1 second batch size
- val sparkConf = new SparkConf().setAppName("NetworkWordCount").set(GemFireLocatorPropKey, args(2))
- val ssc = new StreamingContext(sparkConf, Seconds(1))
- ssc.checkpoint(".")
-
- // Create a socket stream on target ip:port and count the
- // words in input stream of \n delimited text (eg. generated by 'nc')
- // Note that no duplication in storage level only for running locally.
- // Replication necessary in distributed scenario for fault tolerance.
- val lines = ssc.socketTextStream(args(0), args(1).toInt)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- val runningCounts = wordCounts.updateStateByKey[Int](updateFunc)
- // runningCounts.print()
- runningCounts.saveToGemfire("str_int_region")
- ssc.start()
- ssc.awaitTermination()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
new file mode 100644
index 0000000..9fba9e1
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import java.io.Serializable;
+
+public class Employee implements Serializable {
+
+ private String name;
+
+ private int age;
+
+ public Employee(String n, int a) {
+ name = n;
+ age = a;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public String toString() {
+ return new StringBuilder().append("Employee[name=").append(name).
+ append(", age=").append(age).
+ append("]").toString();
+ }
+
+ public boolean equals(Object o) {
+ if (o instanceof Employee) {
+ return ((Employee) o).name.equals(name) && ((Employee) o).age == age;
+ }
+ return false;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
new file mode 100644
index 0000000..8f5a045
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import com.gemstone.gemfire.cache.Region;
+import io.pivotal.geode.spark.connector.GeodeConnection;
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.GeodeConnectionConf$;
+import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager$;
+import io.pivotal.geode.spark.connector.javaapi.GeodeJavaRegionRDD;
+import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster$;
+import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.scalatest.junit.JUnitSuite;
+import io.pivotal.geode.spark.connector.package$;
+import scala.Tuple2;
+import scala.Option;
+import scala.Some;
+
+import java.util.*;
+
+import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.RDDSaveBatchSizePropKey;
+import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.javaFunctions;
+import static org.junit.Assert.*;
+
+public class JavaApiIntegrationTest extends JUnitSuite {
+
+ static JavaSparkContext jsc = null;
+ static GeodeConnectionConf connConf = null;
+
+ static int numServers = 2;
+ static int numObjects = 1000;
+ static String regionPath = "pr_str_int_region";
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // start geode cluster, and spark context
+ Properties settings = new Properties();
+ settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml");
+ settings.setProperty("num-of-servers", Integer.toString(numServers));
+ int locatorPort = GeodeCluster$.MODULE$.start(settings);
+
+ // start spark context in local mode
+ Properties props = new Properties();
+ props.put("log4j.logger.org.apache.spark", "INFO");
+ props.put("log4j.logger.io.pivotal.geode.spark.connector","DEBUG");
+ IOUtils.configTestLog4j("ERROR", props);
+ SparkConf conf = new SparkConf()
+ .setAppName("RetrieveRegionIntegrationTest")
+ .setMaster("local[2]")
+ .set(package$.MODULE$.GeodeLocatorPropKey(), "localhost:"+ locatorPort);
+ // sc = new SparkContext(conf);
+ jsc = new JavaSparkContext(conf);
+ connConf = GeodeConnectionConf.apply(jsc.getConf());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ // stop connection, spark context, and geode cluster
+ DefaultGeodeConnectionManager$.MODULE$.closeConnection(GeodeConnectionConf$.MODULE$.apply(jsc.getConf()));
+ jsc.stop();
+ GeodeCluster$.MODULE$.stop();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // utility methods
+ // --------------------------------------------------------------------------------------------
+
+ private <K,V> void matchMapAndPairList(Map<K,V> map, List<Tuple2<K,V>> list) {
+ assertTrue("size mismatch \nmap: " + map.toString() + "\nlist: " + list.toString(), map.size() == list.size());
+ for (Tuple2<K, V> p : list) {
+ assertTrue("value mismatch: k=" + p._1() + " v1=" + p._2() + " v2=" + map.get(p._1()),
+ p._2().equals(map.get(p._1())));
+ }
+ }
+
+ private Region<String, Integer> prepareStrIntRegion(String regionPath, int start, int stop) {
+ HashMap<String, Integer> entriesMap = new HashMap<>();
+ for (int i = start; i < stop; i ++) {
+ entriesMap.put("k_" + i, i);
+ }
+
+ GeodeConnection conn = connConf.getConnection();
+ Region<String, Integer> region = conn.getRegionProxy(regionPath);
+ region.removeAll(region.keySetOnServer());
+ region.putAll(entriesMap);
+ return region;
+ }
+
+ private JavaPairRDD<String, Integer> prepareStrIntJavaPairRDD(int start, int stop) {
+ List<Tuple2<String, Integer>> data = new ArrayList<>();
+ for (int i = start; i < stop; i ++) {
+ data.add(new Tuple2<>("k_" + i, i));
+ }
+ return jsc.parallelizePairs(data);
+ }
+
+ private JavaPairRDD<Integer, Integer> prepareIntIntJavaPairRDD(int start, int stop) {
+ List<Tuple2<Integer, Integer>> data = new ArrayList<>();
+ for (int i = start; i < stop; i ++) {
+ data.add(new Tuple2<>(i, i * 2));
+ }
+ return jsc.parallelizePairs(data);
+ }
+
+ private JavaRDD<Integer> prepareIntJavaRDD(int start, int stop) {
+ List<Integer> data = new ArrayList<>();
+ for (int i = start; i < stop; i ++) {
+ data.add(i);
+ }
+ return jsc.parallelize(data);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // JavaRDD.saveToGeode
+ // --------------------------------------------------------------------------------------------
+
+ static class IntToStrIntPairFunction implements PairFunction<Integer, String, Integer> {
+ @Override public Tuple2<String, Integer> call(Integer x) throws Exception {
+ return new Tuple2<>("k_" + x, x);
+ }
+ }
+
+ @Test
+ public void testRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws Exception {
+ verifyRDDSaveToGeode(true, true);
+ }
+
+ @Test
+ public void testRDDSaveToGeodeWithDefaultConnConf() throws Exception {
+ verifyRDDSaveToGeode(true, false);
+ }
+
+ @Test
+ public void testRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
+ verifyRDDSaveToGeode(false, true);
+ }
+
+ @Test
+ public void testRDDSaveToGeodeWithConnConf() throws Exception {
+ verifyRDDSaveToGeode(false, false);
+ }
+
+ public void verifyRDDSaveToGeode(boolean useDefaultConnConf, boolean useOpConf) throws Exception {
+ Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries
+ JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects);
+
+ PairFunction<Integer, String, Integer> func = new IntToStrIntPairFunction();
+ Properties opConf = new Properties();
+ opConf.put(RDDSaveBatchSizePropKey, "200");
+
+ if (useDefaultConnConf) {
+ if (useOpConf)
+ javaFunctions(rdd1).saveToGeode(regionPath, func, opConf);
+ else
+ javaFunctions(rdd1).saveToGeode(regionPath, func);
+ } else {
+ if (useOpConf)
+ javaFunctions(rdd1).saveToGeode(regionPath, func, connConf, opConf);
+ else
+ javaFunctions(rdd1).saveToGeode(regionPath, func, connConf);
+ }
+
+ Set<String> keys = region.keySetOnServer();
+ Map<String, Integer> map = region.getAll(keys);
+
+ List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
+
+ for (int i = 0; i < numObjects; i ++) {
+ expectedList.add(new Tuple2<>("k_" + i, i));
+ }
+ matchMapAndPairList(map, expectedList);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // JavaPairRDD.saveToGeode
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testPairRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws Exception {
+ verifyPairRDDSaveToGeode(true, true);
+ }
+
+ @Test
+ public void testPairRDDSaveToGeodeWithDefaultConnConf() throws Exception {
+ verifyPairRDDSaveToGeode(true, false);
+ }
+
+ @Test
+ public void testPairRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
+ verifyPairRDDSaveToGeode(false, true);
+ }
+
+ @Test
+ public void testPairRDDSaveToGeodeWithConnConf() throws Exception {
+ verifyPairRDDSaveToGeode(false, false);
+ }
+
+ public void verifyPairRDDSaveToGeode(boolean useDefaultConnConf, boolean useOpConf) throws Exception {
+ Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries
+ JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, numObjects);
+ Properties opConf = new Properties();
+ opConf.put(RDDSaveBatchSizePropKey, "200");
+
+ if (useDefaultConnConf) {
+ if (useOpConf)
+ javaFunctions(rdd1).saveToGeode(regionPath, opConf);
+ else
+ javaFunctions(rdd1).saveToGeode(regionPath);
+ } else {
+ if (useOpConf)
+ javaFunctions(rdd1).saveToGeode(regionPath, connConf, opConf);
+ else
+ javaFunctions(rdd1).saveToGeode(regionPath, connConf);
+ }
+
+ Set<String> keys = region.keySetOnServer();
+ Map<String, Integer> map = region.getAll(keys);
+
+ List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
+ for (int i = 0; i < numObjects; i ++) {
+ expectedList.add(new Tuple2<>("k_" + i, i));
+ }
+ matchMapAndPairList(map, expectedList);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // JavaSparkContext.geodeRegion and where clause
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testJavaSparkContextGeodeRegion() throws Exception {
+ prepareStrIntRegion(regionPath, 0, numObjects); // remove all entries
+ Properties emptyProps = new Properties();
+ GeodeJavaRegionRDD<String, Integer> rdd1 = javaFunctions(jsc).geodeRegion(regionPath);
+ GeodeJavaRegionRDD<String, Integer> rdd2 = javaFunctions(jsc).geodeRegion(regionPath, emptyProps);
+ GeodeJavaRegionRDD<String, Integer> rdd3 = javaFunctions(jsc).geodeRegion(regionPath, connConf);
+ GeodeJavaRegionRDD<String, Integer> rdd4 = javaFunctions(jsc).geodeRegion(regionPath, connConf, emptyProps);
+ GeodeJavaRegionRDD<String, Integer> rdd5 = rdd1.where("value.intValue() < 50");
+
+ HashMap<String, Integer> expectedMap = new HashMap<>();
+ for (int i = 0; i < numObjects; i ++) {
+ expectedMap.put("k_" + i, i);
+ }
+
+ matchMapAndPairList(expectedMap, rdd1.collect());
+ matchMapAndPairList(expectedMap, rdd2.collect());
+ matchMapAndPairList(expectedMap, rdd3.collect());
+ matchMapAndPairList(expectedMap, rdd4.collect());
+
+ HashMap<String, Integer> expectedMap2 = new HashMap<>();
+ for (int i = 0; i < 50; i ++) {
+ expectedMap2.put("k_" + i, i);
+ }
+
+ matchMapAndPairList(expectedMap2, rdd5.collect());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // JavaPairRDD.joinGeodeRegion
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testPairRDDJoinWithSameKeyType() throws Exception {
+ prepareStrIntRegion(regionPath, 0, numObjects);
+ JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
+
+ JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath);
+ JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, connConf);
+ // System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+ HashMap<Tuple2<String, Integer>, Integer> expectedMap = new HashMap<>();
+ for (int i = 0; i < 10; i ++) {
+ expectedMap.put(new Tuple2<>("k_" + i, i), i);
+ }
+ matchMapAndPairList(expectedMap, rdd2a.collect());
+ matchMapAndPairList(expectedMap, rdd2b.collect());
+ }
+
+ static class IntIntPairToStrKeyFunction implements Function<Tuple2<Integer, Integer>, String> {
+ @Override public String call(Tuple2<Integer, Integer> pair) throws Exception {
+ return "k_" + pair._1();
+ }
+ }
+
+ @Test
+ public void testPairRDDJoinWithDiffKeyType() throws Exception {
+ prepareStrIntRegion(regionPath, 0, numObjects);
+ JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
+ Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction();
+
+ JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
+ JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
+ //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+ HashMap<Tuple2<Integer, Integer>, Integer> expectedMap = new HashMap<>();
+ for (int i = 0; i < 10; i ++) {
+ expectedMap.put(new Tuple2<>(i, i * 2), i);
+ }
+ matchMapAndPairList(expectedMap, rdd2a.collect());
+ matchMapAndPairList(expectedMap, rdd2b.collect());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // JavaPairRDD.outerJoinGeodeRegion
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testPairRDDOuterJoinWithSameKeyType() throws Exception {
+ prepareStrIntRegion(regionPath, 0, numObjects);
+ JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
+
+ JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath);
+ JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, connConf);
+ //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+ HashMap<Tuple2<String, Integer>, Option<Integer>> expectedMap = new HashMap<>();
+ for (int i = -5; i < 10; i ++) {
+ if (i < 0)
+ expectedMap.put(new Tuple2<>("k_" + i, i), Option.apply((Integer) null));
+ else
+ expectedMap.put(new Tuple2<>("k_" + i, i), Some.apply(i));
+ }
+ matchMapAndPairList(expectedMap, rdd2a.collect());
+ matchMapAndPairList(expectedMap, rdd2b.collect());
+ }
+
+ @Test
+ public void testPairRDDOuterJoinWithDiffKeyType() throws Exception {
+ prepareStrIntRegion(regionPath, 0, numObjects);
+ JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
+ Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction();
+
+ JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
+ JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
+ //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+ HashMap<Tuple2<Integer, Integer>, Option<Integer>> expectedMap = new HashMap<>();
+ for (int i = -5; i < 10; i ++) {
+ if (i < 0)
+ expectedMap.put(new Tuple2<>(i, i * 2), Option.apply((Integer) null));
+ else
+ expectedMap.put(new Tuple2<>(i, i * 2), Some.apply(i));
+ }
+ matchMapAndPairList(expectedMap, rdd2a.collect());
+ matchMapAndPairList(expectedMap, rdd2b.collect());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // JavaRDD.joinGeodeRegion
+ // --------------------------------------------------------------------------------------------
+
+ static class IntToStrKeyFunction implements Function<Integer, String> {
+ @Override public String call(Integer x) throws Exception {
+ return "k_" + x;
+ }
+ }
+
+ @Test
+ public void testRDDJoinWithSameKeyType() throws Exception {
+ prepareStrIntRegion(regionPath, 0, numObjects);
+ JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
+
+ Function<Integer, String> func = new IntToStrKeyFunction();
+ JavaPairRDD<Integer, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
+ JavaPairRDD<Integer, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
+ //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+ HashMap<Integer, Integer> expectedMap = new HashMap<>();
+ for (int i = 0; i < 10; i ++) {
+ expectedMap.put(i, i);
+ }
+ matchMapAndPairList(expectedMap, rdd2a.collect());
+ matchMapAndPairList(expectedMap, rdd2b.collect());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // JavaRDD.outerJoinGeodeRegion
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testRDDOuterJoinWithSameKeyType() throws Exception {
+ prepareStrIntRegion(regionPath, 0, numObjects);
+ JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
+
+ Function<Integer, String> func = new IntToStrKeyFunction();
+ JavaPairRDD<Integer, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
+ JavaPairRDD<Integer, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
+ //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+ HashMap<Integer, Option<Integer>> expectedMap = new HashMap<>();
+ for (int i = -5; i < 10; i ++) {
+ if (i < 0)
+ expectedMap.put(i, Option.apply((Integer) null));
+ else
+ expectedMap.put(i, Some.apply(i));
+ }
+ matchMapAndPairList(expectedMap, rdd2a.collect());
+ matchMapAndPairList(expectedMap, rdd2b.collect());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
new file mode 100644
index 0000000..1457db9
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import com.gemstone.gemfire.cache.Declarable;
+
+/**
+ * A stock portfolio that consists of multiple {@link Position} objects that
+ * represent shares of stock (a "security"). Instances of
+ * <code>Portfolio</code> can be stored in a Geode <code>Region</code> and
+ * their contents can be queried using the Geode query service.
+ * </p>
+ * This class is <code>Serializable</code> because we want it to be distributed
+ * to multiple members of a distributed system. Because this class is
+ * <code>Declarable</code>, we can describe instances of it in a Geode
+ * <code>cache.xml</code> file.
+ * </p>
+ *
+ */
+public class Portfolio implements Declarable, Serializable {
+
+ private static final long serialVersionUID = 9097335119586059309L;
+
+ private int id; /* id is used as the entry key and is stored in the entry */
+ private String type;
+ private Map<String,Position> positions = new LinkedHashMap<String,Position>();
+ private String status;
+
+ public Portfolio(Properties props) {
+ init(props);
+ }
+
+ @Override
+ public void init(Properties props) {
+ this.id = Integer.parseInt(props.getProperty("id"));
+ this.type = props.getProperty("type", "type1");
+ this.status = props.getProperty("status", "active");
+
+ // get the positions. These are stored in the properties object
+ // as Positions, not String, so use Hashtable protocol to get at them.
+ // the keys are named "positionN", where N is an integer.
+ for (Map.Entry<Object, Object> entry: props.entrySet()) {
+ String key = (String)entry.getKey();
+ if (key.startsWith("position")) {
+ Position pos = (Position)entry.getValue();
+ this.positions.put(pos.getSecId(), pos);
+ }
+ }
+ }
+
+ public void setType(String t) {this.type = t; }
+
+ public String getStatus(){
+ return status;
+ }
+
+ public int getId(){
+ return this.id;
+ }
+
+ public Map<String,Position> getPositions(){
+ return this.positions;
+ }
+
+ public String getType() {
+ return this.type;
+ }
+
+ public boolean isActive(){
+ return status.equals("active");
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder buf = new StringBuilder();
+ buf.append("\n\tPortfolio [id=" + this.id + " status=" + this.status);
+ buf.append(" type=" + this.type);
+ boolean firstTime = true;
+ for (Map.Entry<String, Position> entry: positions.entrySet()) {
+ if (!firstTime) {
+ buf.append(", ");
+ }
+ buf.append("\n\t\t");
+ buf.append(entry.getKey() + ":" + entry.getValue());
+ firstTime = false;
+ }
+ buf.append("]");
+ return buf.toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
new file mode 100644
index 0000000..d6f8d1f
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import java.io.Serializable;
+import java.util.Properties;
+import com.gemstone.gemfire.cache.Declarable;
+
+/**
+ * Represents a number of shares of a stock ("security") held in a {@link
+ * Portfolio}.
+ * </p>
+ * This class is <code>Serializable</code> because we want it to be distributed
+ * to multiple members of a distributed system. Because this class is
+ * <code>Declarable</code>, we can describe instances of it in a Geode
+ * <code>cache.xml</code> file.
+ * </p>
+ *
+ */
+public class Position implements Declarable, Serializable {
+
+ private static final long serialVersionUID = -8229531542107983344L;
+
+ private String secId;
+ private double qty;
+ private double mktValue;
+
+ public Position(Properties props) {
+ init(props);
+ }
+
+ @Override
+ public void init(Properties props) {
+ this.secId = props.getProperty("secId");
+ this.qty = Double.parseDouble(props.getProperty("qty"));
+ this.mktValue = Double.parseDouble(props.getProperty("mktValue"));
+ }
+
+ public String getSecId(){
+ return this.secId;
+ }
+
+ public double getQty(){
+ return this.qty;
+ }
+
+ public double getMktValue() {
+ return this.mktValue;
+ }
+
+ @Override
+ public String toString(){
+ return new StringBuilder()
+ .append("Position [secId=").append(secId)
+ .append(" qty=").append(this.qty)
+ .append(" mktValue=").append(mktValue).append("]").toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml b/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml
new file mode 100644
index 0000000..79893d6
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE cache PUBLIC
+ "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
+ "http://www.gemstone.com/dtd/cache6_5.dtd" >
+
+<cache>
+ <!-- test region for OQL test -->
+ <region name="obj_obj_region" refid="PARTITION_REDUNDANT" />
+
+ <region name="obj_obj_rep_region" refid="REPLICATE" />
+
+ <region name="str_int_region" refid="PARTITION_REDUNDANT">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="str_str_region" refid="PARTITION_REDUNDANT">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.String</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="str_str_rep_region" refid="REPLICATE">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.String</value-constraint>
+ </region-attributes>
+ </region>
+</cache>
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml b/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml
new file mode 100644
index 0000000..3023959
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE cache PUBLIC
+ "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
+ "http://www.gemstone.com/dtd/cache6_5.dtd" >
+
+<cache>
+ <!-- combinations of key, value types with region types -->
+ <region name="pr_r_obj_obj_region" refid="PARTITION_REDUNDANT" />
+ <region name="pr_obj_obj_region" refid="PARTITION" />
+ <region name="rr_obj_obj_region" refid="REPLICATE" />
+ <region name="rr_p_obj_obj_region" refid="REPLICATE_PERSISTENT" />
+
+ <region name="pr_r_str_int_region" refid="PARTITION_REDUNDANT">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="pr_str_int_region" refid="PARTITION">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="rr_str_int_region" refid="REPLICATE">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+
+ <region name="rr_p_str_int_region" refid="REPLICATE_PERSISTENT">
+ <region-attributes>
+ <key-constraint>java.lang.String</key-constraint>
+ <value-constraint>java.lang.Integer</value-constraint>
+ </region-attributes>
+ </region>
+</cache>
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala
new file mode 100644
index 0000000..a26bcbd
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector
+
+import java.util.Properties
+import com.gemstone.gemfire.cache.query.QueryService
+import com.gemstone.gemfire.cache.query.internal.StructImpl
+import io.pivotal.geode.spark.connector._
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.internal.{RegionMetadata, DefaultGeodeConnectionManager}
+import io.pivotal.geode.spark.connector.internal.oql.{RDDConverter, QueryRDD}
+import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster
+import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils
+import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream}
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import scala.collection.JavaConversions
+import scala.reflect.ClassTag
+
+case class Number(str: String, len: Int)
+
+class BasicIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
+
+ var sc: SparkContext = null
+
+ override def beforeAll() {
+ // start geode cluster, and spark context
+ val settings = new Properties()
+ settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml")
+ settings.setProperty("num-of-servers", "2")
+ val locatorPort = GeodeCluster.start(settings)
+
+ // start spark context in local mode
+ IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
+ "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG")
+ val conf = new SparkConf()
+ .setAppName("BasicIntegrationTest")
+ .setMaster("local[2]")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "io.pivotal.geode.spark.connector.GeodeKryoRegistrator")
+
+ sc = new SparkContext(conf)
+ }
+
+ override def afterAll() {
+ // stop connection, spark context, and geode cluster
+ DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
+ sc.stop()
+ GeodeCluster.stop()
+ }
+
+ //Convert Map[Object, Object] to java.util.Properties
+ private def map2Props(map: Map[Object, Object]): java.util.Properties =
+ (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
+
+ // ===========================================================
+ // DefaultGeodeConnection functional tests
+ // ===========================================================
+
+ test("DefaultGeodeConnection.validateRegion()") {
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+
+ // normal exist-region
+ var regionPath: String = "str_str_region"
+ conn.validateRegion[String, String](regionPath)
+
+ // non-exist region
+ regionPath = "non_exist_region"
+ try {
+ conn.validateRegion[String, String](regionPath)
+ fail("validateRegion failed to catch non-exist region error")
+ } catch {
+ case e: RuntimeException =>
+ if (! e.getMessage.contains(s"The region named $regionPath was not found"))
+ fail("validateRegion gives wrong exception on non-exist region", e)
+ case e: Throwable =>
+ fail("validateRegion gives wrong exception on non-exist region", e)
+ }
+
+ // Note: currently, can't catch type mismatch error
+ conn.validateRegion[String, Integer]("str_str_region")
+ }
+
+ test("DefaultGeodeConnection.getRegionMetadata()") {
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+
+ // exist region
+ validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, false)
+ validateRegionMetadata(conn, "str_int_region", true, 113, "java.lang.String", "java.lang.Integer", false)
+ validateRegionMetadata(conn, "str_str_rep_region", false, 0, "java.lang.String", "java.lang.String", true)
+
+ // non-exist region
+ assert(! conn.getRegionMetadata("no_exist_region").isDefined)
+ }
+
+ def validateRegionMetadata(
+ conn: GeodeConnection, regionPath: String, partitioned: Boolean, buckets: Int,
+ keyType: String, valueType: String, emptyMap: Boolean): Unit = {
+
+ val mdOption = conn.getRegionMetadata(regionPath)
+ val md = mdOption.get
+
+ assert(md.getRegionPath == s"/$regionPath")
+ assert(md.isPartitioned == partitioned)
+ assert(md.getKeyTypeName == keyType)
+ assert(md.getValueTypeName == valueType)
+ assert(md.getTotalBuckets == buckets)
+ if (emptyMap) assert(md.getServerBucketMap == null)
+ else assert(md.getServerBucketMap != null)
+ }
+
+ test("DefaultGeodeConnection.getRegionProxy()") {
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+
+ val region1 = conn.getRegionProxy[String, String]("str_str_region")
+ region1.put("1", "One")
+ assert(region1.get("1") == "One")
+ region1.remove("1")
+ assert(region1.get("1") == null)
+
+ // getRegionProxy doesn't fail when region doesn't exist
+ val region2 = conn.getRegionProxy[String, String]("non_exist_region")
+ try {
+ region2.put("1", "One")
+ fail("getRegionProxy failed to catch non-exist region error")
+ } catch {
+ case e: Exception =>
+ if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region named /non_exist_region was not found")) {
+ e.printStackTrace()
+ fail("validateRegion gives wrong exception on non-exist region", e)
+ }
+ }
+ }
+
+ // Note: DefaultGeodeConnecton.getQuery() and getRegionData() are covered by
+ // RetrieveRegionIntegrationTest.scala and following OQL tests.
+
+ // ===========================================================
+ // OQL functional tests
+ // ===========================================================
+
+ private def initRegion(regionName: String): Unit = {
+
+ //Populate some data in the region
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+
+ //This will call the implicit conversion map2Properties in connector package object, since it is Map[String, String]
+ var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", "mktValue" -> "24.42"))
+ var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", "mktValue" -> "34.29"))
+ val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> "type1", "status" -> "active",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("1", portfolio1)
+
+ position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", "mktValue" -> "12.925"))
+ position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", "mktValue" -> "21.972"))
+ val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> "type2", "status" -> "inactive",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("2", portfolio2)
+
+ position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
+ position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
+ val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> "type3", "status" -> "active",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("3", portfolio3)
+
+ position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" -> "67.356572"))
+ position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" -> "101.34"))
+ val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> "type1", "status" -> "inactive",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("4", portfolio4)
+
+ position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" -> "67.356572"))
+ position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" -> "101.34"))
+ val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> "type2", "status" -> "active",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("5", portfolio5)
+
+ position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" -> "67.356572"))
+ position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" -> "101.34"))
+ val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> "type3", "status" -> "inactive",
+ "position1" -> position1, "position2" -> position2)))
+ rgn.put("6", portfolio6)
+
+ position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
+ position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
+ val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> "type4", "status" -> "active",
+ "position1" -> position1, "position2" -> position2)))
+ //Not using null, due to intermittent query failure on column containing null, likely a Spark SQL bug
+ //portfolio7.setType(null)
+ rgn.put("7", portfolio7)
+ }
+
+ private def getQueryRDD[T: ClassTag](
+ query: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)): QueryRDD[T] =
+ new QueryRDD[T](sc, query, connConf)
+
+ test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Partitioned Region") {
+ simpleQuery("obj_obj_region")
+ }
+
+ test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Replicated Region") {
+ simpleQuery("obj_obj_rep_region")
+ }
+
+ private def simpleQuery(regionName: String) {
+ //Populate some data in the region
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val rgn: Region[String, String] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
+
+ //Create QueryRDD using OQL
+ val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from /$regionName")
+
+ //verify the QueryRDD
+ val oqlRS: Array[String] = OQLResult.collect()
+ oqlRS should have length 3
+ oqlRS should contain theSameElementsAs List("one", "two", "three")
+
+ //Convert QueryRDD to DataFrame
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ // this is used to implicitly convert an RDD to a DataFrame.
+ import sqlContext.implicits._
+ val dataFrame = OQLResult.map(x => Number(x, x.length)).toDF()
+ //Register dataFrame as a table of two columns of type String and Int respectively
+ dataFrame.registerTempTable("numberTable")
+
+ //Issue SQL query against the table
+ val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
+ //Verify the SQL query result, r(0) mean column 0
+ val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
+ sqlRS should have length 3
+ sqlRS should contain theSameElementsAs List("one", "two", "three")
+
+ //Convert QueryRDD to DataFrame using RDDConverter
+ val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+ //Register dataFrame2 as a table of two columns of type String and Int respectively
+ dataFrame2.registerTempTable("numberTable2")
+
+ //Issue SQL query against the table
+ val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2")
+ //Verify the SQL query result, r(0) mean column 0
+ val sqlRS2: Array[Any] = SQLResult2.map(r => r(0)).collect()
+ sqlRS2 should have length 3
+ sqlRS2 should contain theSameElementsAs List("one", "two", "three")
+
+ //Remove the region entries, because other tests might use the same region as well
+ List("1", "2", "3").foreach(rgn.remove)
+ }
+
+ test("Run Geode OQL query and directly return DataFrame: Partitioned Region") {
+ simpleQueryDataFrame("obj_obj_region")
+ }
+
+ test("Run Geode OQL query and directly return DataFrame: Replicated Region") {
+ simpleQueryDataFrame("obj_obj_rep_region")
+ }
+
+ private def simpleQueryDataFrame(regionName: String) {
+ //Populate some data in the region
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[String, String] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+ rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"select * from /$regionName")
+ dataFrame.registerTempTable("numberTable")
+
+ //Issue SQL query against the table
+ val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
+ //Verify the SQL query result, r(0) mean column 0
+ val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
+ sqlRS should have length 3
+ sqlRS should contain theSameElementsAs List("one", "two", "three")
+
+ //Remove the region entries, because other tests might use the same region as well
+ List("1", "2", "3").foreach(rgn.remove)
+ }
+
+ test("Geode OQL query with UDT: Partitioned Region") {
+ queryUDT("obj_obj_region")
+ }
+
+ test("Geode OQL query with UDT: Replicated Region") {
+ queryUDT("obj_obj_rep_region")
+ }
+
+ private def queryUDT(regionName: String) {
+
+ //Populate some data in the region
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+ val e1: Employee = new Employee("hello", 123)
+ val e2: Employee = new Employee("world", 456)
+ rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
+
+ //Create QueryRDD using OQL
+ val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from /$regionName")
+
+ //verify the QueryRDD
+ val oqlRS: Array[Object] = OQLResult.collect()
+ oqlRS should have length 2
+ oqlRS.map(e => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should contain theSameElementsAs List(123, 456)
+
+ //Convert QueryRDD to DataFrame
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+ //Convert QueryRDD to DataFrame using RDDConverter
+ val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+ dataFrame.registerTempTable("employee")
+ val SQLResult = sqlContext.sql("SELECT * FROM employee")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.map(r => r(0)).collect()
+ sqlRS should have length 2
+ sqlRS should contain theSameElementsAs List("hello", "world")
+
+ List("1", "2").foreach(rgn.remove)
+ }
+
+ test("Geode OQL query with UDT and directly return DataFrame: Partitioned Region") {
+ queryUDTDataFrame("obj_obj_region")
+ }
+
+ test("Geode OQL query with UDT and directly return DataFrame: Replicated Region") {
+ queryUDTDataFrame("obj_obj_rep_region")
+ }
+
+ private def queryUDTDataFrame(regionName: String) {
+ //Populate some data in the region
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+ val e1: Employee = new Employee("hello", 123)
+ val e2: Employee = new Employee("world", 456)
+ rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"select name, age from /$regionName")
+
+ dataFrame.registerTempTable("employee")
+ val SQLResult = sqlContext.sql("SELECT * FROM employee")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.map(r => r(0)).collect()
+ sqlRS should have length 2
+ sqlRS should contain theSameElementsAs List("hello", "world")
+
+ List("1", "2").foreach(rgn.remove)
+ }
+
+ test("Geode OQL query with more complex UDT: Partitioned Region") {
+ complexUDT("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT: Replicated Region") {
+ complexUDT("obj_obj_rep_region")
+ }
+
+ private def complexUDT(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create QueryRDD using OQL
+ val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
+
+ //verify the QueryRDD
+ val oqlRS: Array[Int] = OQLResult.collect().map(r => r.asInstanceOf[Portfolio].getId)
+ oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
+
+ //Convert QueryRDD to DataFrame
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+ //Convert QueryRDD to DataFrame using RDDConverter
+ val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
+ sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
+ }
+
+ test("Geode OQL query with more complex UDT and directly return DataFrame: Partitioned Region") {
+ complexUDTDataFrame("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT and directly return DataFrame: Replicated Region") {
+ complexUDTDataFrame("obj_obj_rep_region")
+ }
+
+ private def complexUDTDataFrame(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
+ sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
+ }
+
+ test("Geode OQL query with more complex UDT with Projection: Partitioned Region") {
+ queryComplexUDTProjection("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT with Projection: Replicated Region") {
+ queryComplexUDTProjection("obj_obj_rep_region")
+ }
+
+ private def queryComplexUDTProjection(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create QueryRDD using OQL
+ val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
+
+ //verify the QueryRDD
+ val oqlRS: Array[Int] = OQLResult.collect().map(si => si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int])
+ oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
+
+ //Convert QueryRDD to DataFrame
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+ //Convert QueryRDD to DataFrame using RDDConverter
+ val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0))
+ sqlRS should contain theSameElementsAs List(3)
+ }
+
+ test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Partitioned Region") {
+ queryComplexUDTProjectionDataFrame("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Replicated Region") {
+ queryComplexUDTProjectionDataFrame("obj_obj_rep_region")
+ }
+
+ private def queryComplexUDTProjectionDataFrame(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0))
+ sqlRS should contain theSameElementsAs List(3)
+ }
+
+ test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Partitioned Region") {
+ queryComplexUDTNestProjectionDataFrame("obj_obj_region")
+ }
+
+ test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Replicated Region") {
+ queryComplexUDTNestProjectionDataFrame("obj_obj_rep_region")
+ }
+
+ private def queryComplexUDTNestProjectionDataFrame(regionName: String) {
+
+ initRegion(regionName)
+
+ //Create DataFrame using Geode OQL
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"""SELECT r.id, r."type", r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'""")
+ dataFrame.registerTempTable("Portfolio")
+
+ val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
+
+ //Verify the SQL query result
+ val sqlRS = SQLResult.collect().map(r => r(0))
+ sqlRS should contain theSameElementsAs List(3)
+ }
+
+ test("Undefined instance deserialization: Partitioned Region") {
+ undefinedInstanceDeserialization("obj_obj_region")
+ }
+
+ test("Undefined instance deserialization: Replicated Region") {
+ undefinedInstanceDeserialization("obj_obj_rep_region")
+ }
+
+ private def undefinedInstanceDeserialization(regionName: String) {
+
+ val conn = GeodeConnectionConf(sc.getConf).getConnection
+ val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+ rgn.removeAll(rgn.keySetOnServer())
+
+ //Put some new data
+ rgn.put("1", "one")
+
+ //Query some non-existent columns, which should return UNDEFINED
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ val dataFrame = sqlContext.geodeOQL(s"SELECT col100, col200 FROM /$regionName")
+ val col1 = dataFrame.first().apply(0)
+ val col2 = dataFrame.first().apply(1)
+ assert(col1 == QueryService.UNDEFINED)
+ assert(col2 == QueryService.UNDEFINED)
+ //Verify that col1 and col2 refer to the same Undefined object
+ assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef])
+ }
+
+ test("RDD.saveToGeode") {
+ val regionName = "str_str_region"
+ // generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66))
+ val data = (1 to 6).map(_.toString).map(e=> (e, e*2))
+ val rdd = sc.parallelize(data)
+ rdd.saveToGeode(regionName)
+
+ // verify
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val region: Region[String, String] = connConf.getConnection.getRegionProxy(regionName)
+ println("region key set on server: " + region.keySetOnServer())
+ assert((1 to 6).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
+ (1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e)))
+ }
+
+ // ===========================================================
+ // DStream.saveToGeode() functional tests
+ // ===========================================================
+
+ test("Basic DStream test") {
+ import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener}
+ import io.pivotal.geode.spark.connector.streaming._
+ import org.apache.spark.streaming.ManualClockHelper
+
+ class TestStreamListener extends StreamingListener {
+ var count = 0
+ override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = count += 1
+ }
+
+ def batchDuration = Seconds(1)
+ val ssc = new StreamingContext(sc, batchDuration)
+ val input = Seq(1 to 4, 5 to 8, 9 to 12)
+ val dstream = new TestInputDStream(ssc, input, 2)
+ dstream.saveToGeode[String, Int]("str_int_region", (e: Int) => (e.toString, e))
+ try {
+ val listener = new TestStreamListener
+ ssc.addStreamingListener(listener)
+ ssc.start()
+ ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * input.length)
+ while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50)
+ } catch {
+ case e: Exception => e.printStackTrace(); throw e
+// } finally {
+// ssc.stop()
+ }
+
+ val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
+ val conn = connConf.getConnection
+ val region: Region[String, Int] = conn.getRegionProxy("str_int_region")
+
+ // verify geode region contents
+ println("region key set on server: " + region.keySetOnServer())
+ assert((1 to 12).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
+ (1 to 12).foreach(e => assert(e == region.get(e.toString)))
+ }
+}