You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/20 23:00:36 UTC
[07/14] incubator-geode git commit: GEODE-37 change package name from
io.pivotal.geode (for ./geode-spark-connector/src/main/java/io/pivotal)to
org.apache.geode for(to ./geode-spark-connector/src/main/java/org/apache)
GEODE-37 change package name from io.pivotal.geode (for ./geode-spark-connector/src/main/java/io/pivotal)to org.apache.geode for(to ./geode-spark-connector/src/main/java/org/apache)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f7eaa26c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f7eaa26c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f7eaa26c
Branch: refs/heads/develop
Commit: f7eaa26c0009a8a4ec4f0c692de2c07c1ca4eb78
Parents: 03e60a6
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Tue Sep 20 15:44:10 2016 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Sep 20 16:01:02 2016 -0700
----------------------------------------------------------------------
.../javaapi/GeodeJavaDStreamFunctions.java | 86 -------
.../javaapi/GeodeJavaPairDStreamFunctions.java | 77 ------
.../javaapi/GeodeJavaPairRDDFunctions.java | 238 -------------------
.../javaapi/GeodeJavaRDDFunctions.java | 178 --------------
.../javaapi/GeodeJavaSQLContextFunctions.java | 49 ----
.../javaapi/GeodeJavaSparkContextFunctions.java | 87 -------
.../spark/connector/javaapi/GeodeJavaUtil.java | 122 ----------
.../javaapi/GeodeJavaDStreamFunctions.java | 86 +++++++
.../javaapi/GeodeJavaPairDStreamFunctions.java | 77 ++++++
.../javaapi/GeodeJavaPairRDDFunctions.java | 238 +++++++++++++++++++
.../javaapi/GeodeJavaRDDFunctions.java | 178 ++++++++++++++
.../javaapi/GeodeJavaSQLContextFunctions.java | 49 ++++
.../javaapi/GeodeJavaSparkContextFunctions.java | 87 +++++++
.../spark/connector/javaapi/GeodeJavaUtil.java | 122 ++++++++++
14 files changed, 837 insertions(+), 837 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
deleted file mode 100644
index e7c7cf9..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.streaming.GeodeDStreamFunctions;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import java.util.Properties;
-
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream}
- * to provide Geode Spark Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaDStreamFunctions<T> {
-
- public final GeodeDStreamFunctions<T> dsf;
-
- public GeodeJavaDStreamFunctions(JavaDStream<T> ds) {
- this.dsf = new GeodeDStreamFunctions<T>(ds.dstream());
- }
-
- /**
- * Save the JavaDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the optional parameters for this operation
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) {
- dsf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- * @param opConf the optional parameters for this operation
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, Properties opConf) {
- dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) {
- dsf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the JavaDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func) {
- dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
deleted file mode 100644
index 2c83255..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.streaming.GeodePairDStreamFunctions;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import java.util.Properties;
-
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream}
- * to provide Geode Spark Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaPairDStreamFunctions<K, V> {
-
- public final GeodePairDStreamFunctions<K, V> dsf;
-
- public GeodeJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) {
- this.dsf = new GeodePairDStreamFunctions<K, V>(ds.dstream());
- }
-
- /**
- * Save the JavaPairDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the optional parameters for this operation
- */
- public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) {
- dsf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaPairDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- */
- public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
- dsf.saveToGeode(regionPath, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the JavaPairDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param opConf the optional parameters for this operation
- */
- public void saveToGeode(String regionPath, Properties opConf) {
- dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaPairDStream to Geode key-value store.
- * @param regionPath the full path of region that the DStream is stored
- */
- public void saveToGeode(String regionPath) {
- dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
deleted file mode 100644
index 3278a5b..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.GeodePairRDDFunctions;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import scala.Option;
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-
-import java.util.Properties;
-
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide Geode Spark
- * Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaPairRDDFunctions<K, V> {
-
- public final GeodePairRDDFunctions<K, V> rddf;
-
- public GeodeJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) {
- this.rddf = new GeodePairRDDFunctions<K, V>(rdd.rdd());
- }
-
- /**
- * Save the pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the parameters for this operation
- */
- public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) {
- rddf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param opConf the parameters for this operation
- */
- public void saveToGeode(String regionPath, Properties opConf) {
- rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- */
- public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
- rddf.saveToGeode(regionPath, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the pair RDD to Geode key-value store with the default GeodeConnector.
- * @param regionPath the full path of region that the RDD is stored
- */
- public void saveToGeode(String regionPath) {
- rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap());
- }
-
- /**
- * Return an JavaPairRDD containing all pairs of elements with matching keys in
- * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements
- * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
- * (k, v2) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<<K, V>, V2>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(String regionPath) {
- return joinGeodeRegion(regionPath, rddf.defaultConnectionConf());
- }
-
- /**
- * Return an JavaPairRDD containing all pairs of elements with matching keys in
- * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements
- * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
- * (k, v2) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<<K, V>, V2>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
- String regionPath, GeodeConnectionConf connConf) {
- GeodeJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGeodeRegion(regionPath, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<V2> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD
- * element is generated by `func(K, V) => K2`, and the key from the Geode
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
- * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param <K2> the key type of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, V2>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func) {
- return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD
- * element is generated by `func(K, V) => K2`, and the key from the Geode
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
- * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <K2> the key type of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, V2>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) {
- GeodeJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGeodeRegion(regionPath, func, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<V2> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`.
- * For each element (k, v) in this RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key k.
- *
- * @param regionPath the region path of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(String regionPath) {
- return outerJoinGeodeRegion(regionPath, rddf.defaultConnectionConf());
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`.
- * For each element (k, v) in this RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key k.
- *
- * @param regionPath the region path of the Geode region
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
- String regionPath, GeodeConnectionConf connConf) {
- GeodeOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<Option<V2>> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`.
- * The join key from RDD element is generated by `func(K, V) => K2`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param <K2> the key type of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func) {
- return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`.
- * The join key from RDD element is generated by `func(K, V) => K2`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <K2> the key type of the Geode region
- * @param <V2> the value type of the Geode region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) {
- GeodeOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<Option<V2>> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
deleted file mode 100644
index e4f6f36..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.GeodeRDDFunctions;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Option;
-import scala.reflect.ClassTag;
-
-import java.util.Properties;
-
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide Geode Spark
- * Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaRDDFunctions<T> {
-
- public final GeodeRDDFunctions<T> rddf;
-
- public GeodeJavaRDDFunctions(JavaRDD<T> rdd) {
- this.rddf = new GeodeRDDFunctions<T>(rdd.rdd());
- }
-
- /**
- * Save the non-pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the parameters for this operation
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) {
- rddf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the non-pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) {
- rddf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the non-pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- * @param opConf the parameters for this operation
- */
- public <K, V> void saveToGeode(
- String regionPath, PairFunction<T, K, V> func, Properties opConf) {
- rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the non-pair RDD to Geode key-value store with default GeodeConnector.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- */
- public <K, V> void saveToGeode(String regionPath, PairFunction<T, K, V> func) {
- rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap());
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<T> and the Geode `Region<K, V>`. The join key from RDD
- * element is generated by `func(T) => K`, and the key from the Geode
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
- * where t is from this RDD and v is from the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element T
- * @param <K> the key type of the Geode region
- * @param <V> the value type of the Geode region
- * @return JavaPairRDD<T, V>
- */
- public <K, V> JavaPairRDD<T, V> joinGeodeRegion(String regionPath, Function<T, K> func) {
- return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<T> and the Geode `Region<K, V>`. The join key from RDD
- * element is generated by `func(T) => K`, and the key from the Geode
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
- * where t is from this RDD and v is from the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element T
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <K> the key type of the Geode region
- * @param <V> the value type of the Geode region
- * @return JavaPairRDD<T, V>
- */
- public <K, V> JavaPairRDD<T, V> joinGeodeRegion(
- String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
- GeodeJoinRDD<T, K, V> rdd = rddf.joinGeodeRegion(regionPath, func, connConf);
- ClassTag<T> kt = fakeClassTag();
- ClassTag<V> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`.
- * The join key from RDD element is generated by `func(T) => K`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (t) in this RDD, the resulting RDD will either contain
- * all pairs (t, Some(v)) for v in the Geode region, or the pair
- * (t, None) if no element in the Geode region have key `func(t)`.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element T
- * @param <K> the key type of the Geode region
- * @param <V> the value type of the Geode region
- * @return JavaPairRDD<T, Option<V>>
- */
- public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(String regionPath, Function<T, K> func) {
- return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`.
- * The join key from RDD element is generated by `func(T) => K`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (t) in this RDD, the resulting RDD will either contain
- * all pairs (t, Some(v)) for v in the Geode region, or the pair
- * (t, None) if no element in the Geode region have key `func(t)`.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element T
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param <K> the key type of the Geode region
- * @param <V> the value type of the Geode region
- * @return JavaPairRDD<T, Option<V>>
- */
- public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(
- String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
- GeodeOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf);
- ClassTag<T> kt = fakeClassTag();
- ClassTag<Option<V>> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
deleted file mode 100644
index 3471bf90..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.GeodeSQLContextFunctions;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-
-/**
- * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide Geode
- * OQL functionality.
- *
- * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaSQLContextFunctions {
-
- public final GeodeSQLContextFunctions scf;
-
- public GeodeJavaSQLContextFunctions(SQLContext sqlContext) {
- scf = new GeodeSQLContextFunctions(sqlContext);
- }
-
- public <T> DataFrame geodeOQL(String query) {
- DataFrame df = scf.geodeOQL(query, scf.defaultConnectionConf());
- return df;
- }
-
- public <T> DataFrame geodeOQL(String query, GeodeConnectionConf connConf) {
- DataFrame df = scf.geodeOQL(query, connConf);
- return df;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
deleted file mode 100644
index ce6b1ff..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD;
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD$;
-import org.apache.spark.SparkContext;
-import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-import scala.reflect.ClassTag;
-import java.util.Properties;
-
-/**
- * Java API wrapper over {@link org.apache.spark.SparkContext} to provide Geode
- * Connector functionality.
- *
- * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaSparkContextFunctions {
-
- public final SparkContext sc;
-
- public GeodeJavaSparkContextFunctions(SparkContext sc) {
- this.sc = sc;
- }
-
- /**
- * Expose a Geode region as a JavaPairRDD
- * @param regionPath the full path of the region
- * @param connConf the GeodeConnectionConf that can be used to access the region
- * @param opConf the parameters for this operation, such as preferred partitioner.
- */
- public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(
- String regionPath, GeodeConnectionConf connConf, Properties opConf) {
- ClassTag<K> kt = fakeClassTag();
- ClassTag<V> vt = fakeClassTag();
- GeodeRegionRDD<K, V> rdd = GeodeRegionRDD$.MODULE$.apply(
- sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt);
- return new GeodeJavaRegionRDD<>(rdd);
- }
-
- /**
- * Expose a Geode region as a JavaPairRDD with default GeodeConnector and no preferred partitioner.
- * @param regionPath the full path of the region
- */
- public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath) {
- GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
- return geodeRegion(regionPath, connConf, new Properties());
- }
-
- /**
- * Expose a Geode region as a JavaPairRDD with no preferred partitioner.
- * @param regionPath the full path of the region
- * @param connConf the GeodeConnectionConf that can be used to access the region
- */
- public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, GeodeConnectionConf connConf) {
- return geodeRegion(regionPath, connConf, new Properties());
- }
-
- /**
- * Expose a Geode region as a JavaPairRDD with default GeodeConnector.
- * @param regionPath the full path of the region
- * @param opConf the parameters for this operation, such as preferred partitioner.
- */
- public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, Properties opConf) {
- GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
- return geodeRegion(regionPath, connConf, opConf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
deleted file mode 100644
index 41fe7e5..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import scala.Tuple2;
-
-import io.pivotal.geode.spark.connector.package$;
-
-/**
- * The main entry point to Spark Geode Connector Java API.
- *
- * There are several helpful static factory methods which build useful wrappers
- * around Spark Context, Streaming Context and RDD. There are also helper methods
- * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>.
- */
-public final class GeodeJavaUtil {
-
- /** constants */
- public static String GeodeLocatorPropKey = package$.MODULE$.GeodeLocatorPropKey();
- // partitioner related keys and values
- public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey();
- public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey();
- public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName();
- public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName();
- public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey();
- public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault();
-
- /** The private constructor is used prevents user from creating instance of this class. */
- private GeodeJavaUtil() { }
-
- /**
- * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based
- * on an existing {@link SparkContext} instance.
- */
- public static GeodeJavaSparkContextFunctions javaFunctions(SparkContext sc) {
- return new GeodeJavaSparkContextFunctions(sc);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based
- * on an existing {@link JavaSparkContext} instance.
- */
- public static GeodeJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) {
- return new GeodeJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc));
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaPairRDDFunctions} based on an
- * existing {@link org.apache.spark.api.java.JavaPairRDD} instance.
- */
- public static <K, V> GeodeJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) {
- return new GeodeJavaPairRDDFunctions<K, V>(rdd);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaRDDFunctions} based on an
- * existing {@link org.apache.spark.api.java.JavaRDD} instance.
- */
- public static <T> GeodeJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) {
- return new GeodeJavaRDDFunctions<T>(rdd);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaPairDStreamFunctions} based on an
- * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance.
- */
- public static <K, V> GeodeJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) {
- return new GeodeJavaPairDStreamFunctions<>(ds);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaDStreamFunctions} based on an
- * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance.
- */
- public static <T> GeodeJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) {
- return new GeodeJavaDStreamFunctions<>(ds);
- }
-
- /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}<<Tuple2<K, V>>
- * to a {@link org.apache.spark.api.java.JavaPairRDD}<K, V>.
- */
- public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) {
- return JavaAPIHelper.toJavaPairRDD(rdd);
- }
-
- /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}<<Tuple2<K, V>>
- * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}<K, V>.
- */
- public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) {
- return JavaAPIHelper.toJavaPairDStream(ds);
- }
-
- /**
- * A static factory method to create a {@link GeodeJavaSQLContextFunctions} based
- * on an existing {@link SQLContext} instance.
- */
- public static GeodeJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) {
- return new GeodeJavaSQLContextFunctions(sqlContext);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
new file mode 100644
index 0000000..e7c7cf9
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.streaming.GeodeDStreamFunctions;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import java.util.Properties;
+
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream}
+ * to provide Geode Spark Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaDStreamFunctions<T> {
+
+ public final GeodeDStreamFunctions<T> dsf;
+
+ public GeodeJavaDStreamFunctions(JavaDStream<T> ds) {
+ this.dsf = new GeodeDStreamFunctions<T>(ds.dstream());
+ }
+
+ /**
+ * Save the JavaDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param opConf the optional parameters for this operation
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) {
+ dsf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the JavaDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+ * @param opConf the optional parameters for this operation
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func, Properties opConf) {
+ dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the JavaDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) {
+ dsf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
+ }
+
+ /**
+ * Save the JavaDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func) {
+ dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
new file mode 100644
index 0000000..2c83255
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.streaming.GeodePairDStreamFunctions;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import java.util.Properties;
+
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream}
+ * to provide Geode Spark Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaPairDStreamFunctions<K, V> {
+
+ public final GeodePairDStreamFunctions<K, V> dsf;
+
+ public GeodeJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) {
+ this.dsf = new GeodePairDStreamFunctions<K, V>(ds.dstream());
+ }
+
+ /**
+ * Save the JavaPairDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param opConf the optional parameters for this operation
+ */
+ public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) {
+ dsf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the JavaPairDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ */
+ public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
+ dsf.saveToGeode(regionPath, connConf, emptyStrStrMap());
+ }
+
+ /**
+ * Save the JavaPairDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ * @param opConf the optional parameters for this operation
+ */
+ public void saveToGeode(String regionPath, Properties opConf) {
+ dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the JavaPairDStream to Geode key-value store.
+ * @param regionPath the full path of region that the DStream is stored
+ */
+ public void saveToGeode(String regionPath) {
+ dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
new file mode 100644
index 0000000..3278a5b
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.GeodePairRDDFunctions;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import scala.Option;
+import scala.Tuple2;
+import scala.reflect.ClassTag;
+
+import java.util.Properties;
+
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide Geode Spark
+ * Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaPairRDDFunctions<K, V> {
+
+ public final GeodePairRDDFunctions<K, V> rddf;
+
+ public GeodeJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) {
+ this.rddf = new GeodePairRDDFunctions<K, V>(rdd.rdd());
+ }
+
+ /**
+ * Save the pair RDD to Geode key-value store.
+ * @param regionPath the full path of region that the RDD is stored
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param opConf the parameters for this operation
+ */
+ public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) {
+ rddf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the pair RDD to Geode key-value store.
+ * @param regionPath the full path of region that the RDD is stored
+ * @param opConf the parameters for this operation
+ */
+ public void saveToGeode(String regionPath, Properties opConf) {
+ rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the pair RDD to Geode key-value store.
+ * @param regionPath the full path of region that the RDD is stored
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ */
+ public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
+ rddf.saveToGeode(regionPath, connConf, emptyStrStrMap());
+ }
+
+ /**
+ * Save the pair RDD to Geode key-value store with the default GeodeConnector.
+ * @param regionPath the full path of region that the RDD is stored
+ */
+ public void saveToGeode(String regionPath) {
+ rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap());
+ }
+
+ /**
+ * Return an JavaPairRDD containing all pairs of elements with matching keys in
+ * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements
+ * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
+ * (k, v2) is in the Geode region.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param <V2> the value type of the Geode region
+ * @return JavaPairRDD<<K, V>, V2>
+ */
+ public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(String regionPath) {
+ return joinGeodeRegion(regionPath, rddf.defaultConnectionConf());
+ }
+
+ /**
+ * Return an JavaPairRDD containing all pairs of elements with matching keys in
+ * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements
+ * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
+ * (k, v2) is in the Geode region.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param <V2> the value type of the Geode region
+ * @return JavaPairRDD<<K, V>, V2>
+ */
+ public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
+ String regionPath, GeodeConnectionConf connConf) {
+ GeodeJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGeodeRegion(regionPath, connConf);
+ ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+ ClassTag<V2> vt = fakeClassTag();
+ return new JavaPairRDD<>(rdd, kt, vt);
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in this
+ * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD
+ * element is generated by `func(K, V) => K2`, and the key from the Geode
+ * region is just the key of the key/value pair.
+ *
+ * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
+ * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element (K, V)
+ * @param <K2> the key type of the Geode region
+ * @param <V2> the value type of the Geode region
+ * @return JavaPairRDD<Tuple2<K, V>, V2>
+ */
+ public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
+ String regionPath, Function<Tuple2<K, V>, K2> func) {
+ return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in this
+ * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD
+ * element is generated by `func(K, V) => K2`, and the key from the Geode
+ * region is just the key of the key/value pair.
+ *
+ * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
+ * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element (K, V)
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param <K2> the key type of the Geode region
+ * @param <V2> the value type of the Geode region
+ * @return JavaPairRDD<Tuple2<K, V>, V2>
+ */
+ public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
+ String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) {
+ GeodeJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGeodeRegion(regionPath, func, connConf);
+ ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+ ClassTag<V2> vt = fakeClassTag();
+ return new JavaPairRDD<>(rdd, kt, vt);
+ }
+
+ /**
+ * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`.
+ * For each element (k, v) in this RDD, the resulting RDD will either contain
+ * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+ * ((k, v), None)) if no element in the Geode region have key k.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param <V2> the value type of the Geode region
+ * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
+ */
+ public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(String regionPath) {
+ return outerJoinGeodeRegion(regionPath, rddf.defaultConnectionConf());
+ }
+
+ /**
+ * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`.
+ * For each element (k, v) in this RDD, the resulting RDD will either contain
+ * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+ * ((k, v), None)) if no element in the Geode region have key k.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param <V2> the value type of the Geode region
+ * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
+ */
+ public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
+ String regionPath, GeodeConnectionConf connConf) {
+ GeodeOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, connConf);
+ ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+ ClassTag<Option<V2>> vt = fakeClassTag();
+ return new JavaPairRDD<>(rdd, kt, vt);
+ }
+
+ /**
+ * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`.
+ * The join key from RDD element is generated by `func(K, V) => K2`, and the
+ * key from region is just the key of the key/value pair.
+ *
+ * For each element (k, v) in `this` RDD, the resulting RDD will either contain
+ * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+ * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element (K, V)
+ * @param <K2> the key type of the Geode region
+ * @param <V2> the value type of the Geode region
+ * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
+ */
+ public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
+ String regionPath, Function<Tuple2<K, V>, K2> func) {
+ return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
+ }
+
+ /**
+ * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`.
+ * The join key from RDD element is generated by `func(K, V) => K2`, and the
+ * key from region is just the key of the key/value pair.
+ *
+ * For each element (k, v) in `this` RDD, the resulting RDD will either contain
+ * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+ * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element (K, V)
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param <K2> the key type of the Geode region
+ * @param <V2> the value type of the Geode region
+ * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
+ */
+ public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
+ String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) {
+ GeodeOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf);
+ ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+ ClassTag<Option<V2>> vt = fakeClassTag();
+ return new JavaPairRDD<>(rdd, kt, vt);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
new file mode 100644
index 0000000..e4f6f36
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.GeodeRDDFunctions;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Option;
+import scala.reflect.ClassTag;
+
+import java.util.Properties;
+
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide Geode Spark
+ * Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaRDDFunctions<T> {
+
+ public final GeodeRDDFunctions<T> rddf;
+
+ public GeodeJavaRDDFunctions(JavaRDD<T> rdd) {
+ this.rddf = new GeodeRDDFunctions<T>(rdd.rdd());
+ }
+
+ /**
+ * Save the non-pair RDD to Geode key-value store.
+ * @param regionPath the full path of region that the RDD is stored
+ * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param opConf the parameters for this operation
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) {
+ rddf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the non-pair RDD to Geode key-value store.
+ * @param regionPath the full path of region that the RDD is stored
+ * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) {
+ rddf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
+ }
+
+ /**
+ * Save the non-pair RDD to Geode key-value store.
+ * @param regionPath the full path of region that the RDD is stored
+ * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
+ * @param opConf the parameters for this operation
+ */
+ public <K, V> void saveToGeode(
+ String regionPath, PairFunction<T, K, V> func, Properties opConf) {
+ rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
+ }
+
+ /**
+ * Save the non-pair RDD to Geode key-value store with default GeodeConnector.
+ * @param regionPath the full path of region that the RDD is stored
+ * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
+ */
+ public <K, V> void saveToGeode(String regionPath, PairFunction<T, K, V> func) {
+ rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap());
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in this
+ * RDD<T> and the Geode `Region<K, V>`. The join key from RDD
+ * element is generated by `func(T) => K`, and the key from the Geode
+ * region is just the key of the key/value pair.
+ *
+ * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
+ * where t is from this RDD and v is from the Geode region.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element T
+ * @param <K> the key type of the Geode region
+ * @param <V> the value type of the Geode region
+ * @return JavaPairRDD<T, V>
+ */
+ public <K, V> JavaPairRDD<T, V> joinGeodeRegion(String regionPath, Function<T, K> func) {
+ return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in this
+ * RDD<T> and the Geode `Region<K, V>`. The join key from RDD
+ * element is generated by `func(T) => K`, and the key from the Geode
+ * region is just the key of the key/value pair.
+ *
+ * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
+ * where t is from this RDD and v is from the Geode region.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element T
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param <K> the key type of the Geode region
+ * @param <V> the value type of the Geode region
+ * @return JavaPairRDD<T, V>
+ */
+ public <K, V> JavaPairRDD<T, V> joinGeodeRegion(
+ String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
+ GeodeJoinRDD<T, K, V> rdd = rddf.joinGeodeRegion(regionPath, func, connConf);
+ ClassTag<T> kt = fakeClassTag();
+ ClassTag<V> vt = fakeClassTag();
+ return new JavaPairRDD<>(rdd, kt, vt);
+ }
+
+ /**
+ * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`.
+ * The join key from RDD element is generated by `func(T) => K`, and the
+ * key from region is just the key of the key/value pair.
+ *
+ * For each element (t) in this RDD, the resulting RDD will either contain
+ * all pairs (t, Some(v)) for v in the Geode region, or the pair
+ * (t, None) if no element in the Geode region have key `func(t)`.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element T
+ * @param <K> the key type of the Geode region
+ * @param <V> the value type of the Geode region
+ * @return JavaPairRDD<T, Option<V>>
+ */
+ public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(String regionPath, Function<T, K> func) {
+ return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
+ }
+
+ /**
+ * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`.
+ * The join key from RDD element is generated by `func(T) => K`, and the
+ * key from region is just the key of the key/value pair.
+ *
+ * For each element (t) in this RDD, the resulting RDD will either contain
+ * all pairs (t, Some(v)) for v in the Geode region, or the pair
+ * (t, None) if no element in the Geode region have key `func(t)`.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element T
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param <K> the key type of the Geode region
+ * @param <V> the value type of the Geode region
+ * @return JavaPairRDD<T, Option<V>>
+ */
+ public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(
+ String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
+ GeodeOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf);
+ ClassTag<T> kt = fakeClassTag();
+ ClassTag<Option<V>> vt = fakeClassTag();
+ return new JavaPairRDD<>(rdd, kt, vt);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
new file mode 100644
index 0000000..3471bf90
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.GeodeSQLContextFunctions;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+/**
+ * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide Geode
+ * OQL functionality.
+ *
+ * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaSQLContextFunctions {
+
+ public final GeodeSQLContextFunctions scf;
+
+ public GeodeJavaSQLContextFunctions(SQLContext sqlContext) {
+ scf = new GeodeSQLContextFunctions(sqlContext);
+ }
+
+ public <T> DataFrame geodeOQL(String query) {
+ DataFrame df = scf.geodeOQL(query, scf.defaultConnectionConf());
+ return df;
+ }
+
+ public <T> DataFrame geodeOQL(String query, GeodeConnectionConf connConf) {
+ DataFrame df = scf.geodeOQL(query, connConf);
+ return df;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
new file mode 100644
index 0000000..ce6b1ff
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD$;
+import org.apache.spark.SparkContext;
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+import scala.reflect.ClassTag;
+import java.util.Properties;
+
+/**
+ * Java API wrapper over {@link org.apache.spark.SparkContext} to provide Geode
+ * Connector functionality.
+ *
+ * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaSparkContextFunctions {
+
+ public final SparkContext sc;
+
+ public GeodeJavaSparkContextFunctions(SparkContext sc) {
+ this.sc = sc;
+ }
+
+ /**
+ * Expose a Geode region as a JavaPairRDD
+ * @param regionPath the full path of the region
+ * @param connConf the GeodeConnectionConf that can be used to access the region
+ * @param opConf the parameters for this operation, such as preferred partitioner.
+ */
+ public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(
+ String regionPath, GeodeConnectionConf connConf, Properties opConf) {
+ ClassTag<K> kt = fakeClassTag();
+ ClassTag<V> vt = fakeClassTag();
+ GeodeRegionRDD<K, V> rdd = GeodeRegionRDD$.MODULE$.apply(
+ sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt);
+ return new GeodeJavaRegionRDD<>(rdd);
+ }
+
+ /**
+ * Expose a Geode region as a JavaPairRDD with default GeodeConnector and no preferred partitioner.
+ * @param regionPath the full path of the region
+ */
+ public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath) {
+ GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
+ return geodeRegion(regionPath, connConf, new Properties());
+ }
+
+ /**
+ * Expose a Geode region as a JavaPairRDD with no preferred partitioner.
+ * @param regionPath the full path of the region
+ * @param connConf the GeodeConnectionConf that can be used to access the region
+ */
+ public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, GeodeConnectionConf connConf) {
+ return geodeRegion(regionPath, connConf, new Properties());
+ }
+
+ /**
+ * Expose a Geode region as a JavaPairRDD with default GeodeConnector.
+ * @param regionPath the full path of the region
+ * @param opConf the parameters for this operation, such as preferred partitioner.
+ */
+ public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, Properties opConf) {
+ GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
+ return geodeRegion(regionPath, connConf, opConf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java
new file mode 100644
index 0000000..41fe7e5
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import scala.Tuple2;
+
+import io.pivotal.geode.spark.connector.package$;
+
+/**
+ * The main entry point to Spark Geode Connector Java API.
+ *
+ * There are several helpful static factory methods which build useful wrappers
+ * around Spark Context, Streaming Context and RDD. There are also helper methods
+ * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>.
+ */
+public final class GeodeJavaUtil {
+
+ /** constants */
+ public static String GeodeLocatorPropKey = package$.MODULE$.GeodeLocatorPropKey();
+ // partitioner related keys and values
+ public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey();
+ public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey();
+ public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName();
+ public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName();
+ public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey();
+ public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault();
+
+ /** The private constructor is used prevents user from creating instance of this class. */
+ private GeodeJavaUtil() { }
+
+ /**
+ * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based
+ * on an existing {@link SparkContext} instance.
+ */
+ public static GeodeJavaSparkContextFunctions javaFunctions(SparkContext sc) {
+ return new GeodeJavaSparkContextFunctions(sc);
+ }
+
+ /**
+ * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based
+ * on an existing {@link JavaSparkContext} instance.
+ */
+ public static GeodeJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) {
+ return new GeodeJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc));
+ }
+
+ /**
+ * A static factory method to create a {@link GeodeJavaPairRDDFunctions} based on an
+ * existing {@link org.apache.spark.api.java.JavaPairRDD} instance.
+ */
+ public static <K, V> GeodeJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) {
+ return new GeodeJavaPairRDDFunctions<K, V>(rdd);
+ }
+
+ /**
+ * A static factory method to create a {@link GeodeJavaRDDFunctions} based on an
+ * existing {@link org.apache.spark.api.java.JavaRDD} instance.
+ */
+ public static <T> GeodeJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) {
+ return new GeodeJavaRDDFunctions<T>(rdd);
+ }
+
+ /**
+ * A static factory method to create a {@link GeodeJavaPairDStreamFunctions} based on an
+ * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance.
+ */
+ public static <K, V> GeodeJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) {
+ return new GeodeJavaPairDStreamFunctions<>(ds);
+ }
+
+ /**
+ * A static factory method to create a {@link GeodeJavaDStreamFunctions} based on an
+ * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance.
+ */
+ public static <T> GeodeJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) {
+ return new GeodeJavaDStreamFunctions<>(ds);
+ }
+
+ /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}<<Tuple2<K, V>>
+ * to a {@link org.apache.spark.api.java.JavaPairRDD}<K, V>.
+ */
+ public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) {
+ return JavaAPIHelper.toJavaPairRDD(rdd);
+ }
+
+ /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}<<Tuple2<K, V>>
+ * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}<K, V>.
+ */
+ public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) {
+ return JavaAPIHelper.toJavaPairDStream(ds);
+ }
+
+ /**
+ * A static factory method to create a {@link GeodeJavaSQLContextFunctions} based
+ * on an existing {@link SQLContext} instance.
+ */
+ public static GeodeJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) {
+ return new GeodeJavaSQLContextFunctions(sqlContext);
+ }
+
+}