You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/21 19:16:46 UTC
[11/50] [abbrv] incubator-geode git commit: GEODE-1244: Package,
directory, project and file rename for geode-spark-connector
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
new file mode 100644
index 0000000..ff4cd17
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import com.gemstone.gemfire.cache.execute.ResultCollector
+import com.gemstone.gemfire.cache.query.Query
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.internal.RegionMetadata
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition
+
+
+trait GeodeConnection {
+
+ /**
+ * Validate region existence and key/value type constraints, throw RuntimeException
+ * if region does not exist or key and/or value type do(es) not match.
+ * @param regionPath the full path of region
+ */
+ def validateRegion[K, V](regionPath: String): Unit
+
+ /**
+ * Get Region proxy for the given region
+ * @param regionPath the full path of region
+ */
+ def getRegionProxy[K, V](regionPath: String): Region[K, V]
+
+ /**
+ * Retrieve region meta data for the given region.
+ * @param regionPath: the full path of the region
+ * @return Some[RegionMetadata] if region exists, None otherwise
+ */
+ def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata]
+
+ /**
+ * Retrieve region data for the given region and bucket set
+ * @param regionPath: the full path of the region
+ * @param whereClause: the set of bucket IDs
+ * @param split: Geode RDD Partition instance
+ */
+ def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)]
+
+ def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object
+ /**
+ * Create a geode OQL query
+ * @param queryString Geode OQL query string
+ */
+ def getQuery(queryString: String): Query
+
+ /** Close the connection */
+ def close(): Unit
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
new file mode 100644
index 0000000..38d9e07
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import org.apache.spark.SparkConf
+import io.pivotal.geode.spark.connector.internal.{DefaultGeodeConnectionManager, LocatorHelper}
+
+/**
+ * Stores configuration of a connection to Geode cluster. It is serializable and can
+ * be safely sent over network.
+ *
+ * @param locators Geode locator host:port pairs, the default is (localhost,10334)
+ * @param geodeProps The initial geode properties to be used.
+ * @param connectionManager GeodeConnectionFactory instance
+ */
+class GeodeConnectionConf(
+ val locators: Seq[(String, Int)],
+ val geodeProps: Map[String, String] = Map.empty,
+ connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager
+ ) extends Serializable {
+
+ /** require at least 1 pair of (host,port) */
+ require(locators.nonEmpty)
+
+ def getConnection: GeodeConnection = connectionManager.getConnection(this)
+
+}
+
+object GeodeConnectionConf {
+
+ /**
+ * create GeodeConnectionConf object based on locator string and optional GeodeConnectionFactory
+ * @param locatorStr Geode cluster locator string
+ * @param connectionManager GeodeConnection factory
+ */
+ def apply(locatorStr: String, geodeProps: Map[String, String] = Map.empty)
+ (implicit connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager): GeodeConnectionConf = {
+ new GeodeConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), geodeProps, connectionManager)
+ }
+
+ /**
+ * create GeodeConnectionConf object based on SparkConf. Note that implicit can
+ * be used to control what GeodeConnectionFactory instance to use if desired
+ * @param conf a SparkConf instance
+ */
+ def apply(conf: SparkConf): GeodeConnectionConf = {
+ val locatorStr = conf.getOption(GeodeLocatorPropKey).getOrElse(
+ throw new RuntimeException(s"SparkConf does not contain property $GeodeLocatorPropKey"))
+ // SparkConf only holds properties whose key starts with "spark.", In order to
+ // put geode properties in SparkConf, all geode properties are prefixes with
+ // "spark.geode.". This prefix was removed before the properties were put in `geodeProp`
+ val prefix = "spark.geode."
+ val geodeProps = conf.getAll.filter {
+ case (k, v) => k.startsWith(prefix) && k != GeodeLocatorPropKey
+ }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap
+ apply(locatorStr, geodeProps)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
new file mode 100644
index 0000000..bf678f0
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+/**
+ * GeodeConnectionFactory provide an common interface that manages Geode
+ * connections, and it's serializable. Each factory instance will handle
+ * connection instance creation and connection pool management.
+ */
+trait GeodeConnectionManager extends Serializable {
+
+ /** get connection for the given connector */
+ def getConnection(connConf: GeodeConnectionConf): GeodeConnection
+
+ /** close the connection */
+ def closeConnection(connConf: GeodeConnectionConf): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
new file mode 100644
index 0000000..6e93b05
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import java.io.File
+import java.net.URL
+import org.apache.commons.httpclient.methods.PostMethod
+import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity}
+import org.apache.commons.httpclient.HttpClient
+import org.apache.spark.Logging
+
+object GeodeFunctionDeployer {
+ def main(args: Array[String]) {
+ new GeodeFunctionDeployer(new HttpClient()).commandLineRun(args)
+ }
+}
+
+class GeodeFunctionDeployer(val httpClient:HttpClient) extends Logging {
+
+ def deploy(host: String, port: Int, jarLocation: String): String =
+ deploy(host + ":" + port, jarLocation)
+
+ def deploy(host: String, port: Int, jar:File): String =
+ deploy(host + ":" + port, jar)
+
+ def deploy(jmxHostAndPort: String, jarLocation: String): String =
+ deploy(jmxHostAndPort, jarFileHandle(jarLocation))
+
+ def deploy(jmxHostAndPort: String, jar: File): String = {
+ val urlString = constructURLString(jmxHostAndPort)
+ val filePost: PostMethod = new PostMethod(urlString)
+ val parts: Array[Part] = new Array[Part](1)
+ parts(0) = new FilePart("resources", jar)
+ filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams))
+ val status: Int = httpClient.executeMethod(filePost)
+ "Deployed Jar with status:" + status
+ }
+
+ private[connector] def constructURLString(jmxHostAndPort: String) =
+ "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
+
+ private[connector]def jarFileHandle(jarLocation: String) = {
+ val f: File = new File(jarLocation)
+ if (!f.exists()) {
+ val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath
+ logInfo(errorMessage)
+ throw new RuntimeException(errorMessage)
+ }
+ f
+ }
+
+ def commandLineRun(args: Array[String]):Unit = {
+ val (hostPort: String, jarFile: String) =
+ if (args.length < 2) {
+ logInfo("JMX Manager Host and Port (example: localhost:7070):")
+ val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in))
+ val jmxHostAndPort = bufferedReader.readLine()
+ logInfo("Location of geode-functions.jar:")
+ val functionJarLocation = bufferedReader.readLine()
+ (jmxHostAndPort, functionJarLocation)
+ } else {
+ (args(0), args(1))
+ }
+ val status = deploy(hostPort, jarFile)
+ logInfo(status)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
new file mode 100644
index 0000000..8c0aeca
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import com.esotericsoftware.kryo.Kryo
+import io.pivotal.geode.spark.connector.internal.oql.UndefinedSerializer
+import org.apache.spark.serializer.KryoRegistrator
+import com.gemstone.gemfire.cache.query.internal.Undefined
+
+class GeodeKryoRegistrator extends KryoRegistrator{
+
+ override def registerClasses(kyro: Kryo): Unit = {
+ kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer])
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
new file mode 100644
index 0000000..ba5d2df
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodePairRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.Function
+import org.apache.spark.rdd.RDD
+
+/**
+ * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion.
+ * Import `io.pivotal.geode.spark.connector._` at the top of your program to
+ * use these functions.
+ */
+class GeodePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging {
+
+ /**
+ * Save the RDD of pairs to Geode key-value store without any conversion
+ * @param regionPath the full path of region that the RDD is stored
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param opConf the optional parameters for this operation
+ */
+ def saveToGeode(
+ regionPath: String,
+ connConf: GeodeConnectionConf = defaultConnectionConf,
+ opConf: Map[String, String] = Map.empty): Unit = {
+ connConf.getConnection.validateRegion[K, V](regionPath)
+ if (log.isDebugEnabled)
+ logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""")
+ else
+ logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
+ val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf)
+ rdd.sparkContext.runJob(rdd, writer.write _)
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this`
+ * RDD and the Geode `Region[K, V2]`. Each pair of elements will be returned
+ * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the
+ * Geode region.
+ *
+ *@param regionPath the region path of the Geode region
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @tparam K2 the key type of the Geode region
+ * @tparam V2 the value type of the Geode region
+ * @return RDD[T, V]
+ */
+ def joinGeodeRegion[K2 <: K, V2](
+ regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K, V2] = {
+ new GeodeJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this` RDD
+ * and the Geode `Region[K2, V2]`. The join key from RDD element is generated by
+ * `func(K, V) => K2`, and the key from the Geode region is jus the key of the
+ * key/value pair.
+ *
+ * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
+ * where (k, v) is in `this` RDD and (k2, v2) is in the Geode region.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element (K, V)
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @tparam K2 the key type of the Geode region
+ * @tparam V2 the value type of the Geode region
+ * @return RDD[(K, V), V2]
+ */
+ def joinGeodeRegion[K2, V2](
+ regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K2, V2] =
+ new GeodeJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
+
+ /** This version of joinGeodeRegion(...) is just for Java API. */
+ private[connector] def joinGeodeRegion[K2, V2](
+ regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = {
+ new GeodeJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
+ }
+
+ /**
+ * Perform a left outer join of `this` RDD and the Geode `Region[K, V2]`.
+ * For each element (k, v) in `this` RDD, the resulting RDD will either contain
+ * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+ * ((k, v), None)) if no element in the Geode region have key k.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @tparam K2 the key type of the Geode region
+ * @tparam V2 the value type of the Geode region
+ * @return RDD[ (K, V), Option[V] ]
+ */
+ def outerJoinGeodeRegion[K2 <: K, V2](
+ regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K, V2] = {
+ new GeodeOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
+ }
+
+ /**
+ * Perform a left outer join of `this` RDD and the Geode `Region[K2, V2]`.
+ * The join key from RDD element is generated by `func(K, V) => K2`, and the
+ * key from region is jus the key of the key/value pair.
+ *
+ * For each element (k, v) in `this` RDD, the resulting RDD will either contain
+ * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+ * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
+ *
+ *@param regionPath the region path of the Geode region
+ * @param func the function that generates region key from RDD element (K, V)
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @tparam K2 the key type of the Geode region
+ * @tparam V2 the value type of the Geode region
+ * @return RDD[ (K, V), Option[V] ]
+ */
+ def outerJoinGeodeRegion[K2, V2](
+ regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
+ new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
+ }
+
+ /** This version of outerJoinGeodeRegion(...) is just for Java API. */
+ private[connector] def outerJoinGeodeRegion[K2, V2](
+ regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
+ new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
+ }
+
+ private[connector] def defaultConnectionConf: GeodeConnectionConf =
+ GeodeConnectionConf(rdd.sparkContext.getConf)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
new file mode 100644
index 0000000..2e5c92a
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodeRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.{PairFunction, Function}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Extra gemFire functions on non-Pair RDDs through an implicit conversion.
+ * Import `io.pivotal.geode.spark.connector._` at the top of your program to
+ * use these functions.
+ */
+class GeodeRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging {
+
+ /**
+ * Save the non-pair RDD to Geode key-value store.
+ * @param regionPath the full path of region that the RDD is stored
+ * @param func the function that converts elements of RDD to key/value pairs
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @param opConf the optional parameters for this operation
+ */
+ def saveToGeode[K, V](
+ regionPath: String,
+ func: T => (K, V),
+ connConf: GeodeConnectionConf = defaultConnectionConf,
+ opConf: Map[String, String] = Map.empty): Unit = {
+ connConf.getConnection.validateRegion[K, V](regionPath)
+ if (log.isDebugEnabled)
+ logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""")
+ else
+ logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
+ val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf)
+ rdd.sparkContext.runJob(rdd, writer.write(func) _)
+ }
+
+ /** This version of saveToGeode(...) is just for Java API. */
+ private[connector] def saveToGeode[K, V](
+ regionPath: String,
+ func: PairFunction[T, K, V],
+ connConf: GeodeConnectionConf,
+ opConf: Map[String, String]): Unit = {
+ saveToGeode[K, V](regionPath, func.call _, connConf, opConf)
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this` RDD
+ * and the Geode `Region[K, V]`. The join key from RDD element is generated by
+ * `func(T) => K`, and the key from the Geode region is just the key of the
+ * key/value pair.
+ *
+ * Each pair of elements of result RDD will be returned as a (t, v) tuple,
+ * where (t) is in `this` RDD and (k, v) is in the Geode region.
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generate region key from RDD element T
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @tparam K the key type of the Geode region
+ * @tparam V the value type of the Geode region
+ * @return RDD[T, V]
+ */
+ def joinGeodeRegion[K, V](regionPath: String, func: T => K,
+ connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[T, K, V] = {
+ new GeodeJoinRDD[T, K, V](rdd, func, regionPath, connConf)
+ }
+
+ /** This version of joinGeodeRegion(...) is just for Java API. */
+ private[connector] def joinGeodeRegion[K, V](
+ regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeJoinRDD[T, K, V] = {
+ joinGeodeRegion(regionPath, func.call _, connConf)
+ }
+
+ /**
+ * Perform a left outer join of `this` RDD and the Geode `Region[K, V]`.
+ * The join key from RDD element is generated by `func(T) => K`, and the
+ * key from region is just the key of the key/value pair.
+ *
+ * For each element (t) in `this` RDD, the resulting RDD will either contain
+ * all pairs (t, Some(v)) for v in the Geode region, or the pair
+ * (t, None) if no element in the Geode region have key `func(t)`
+ *
+ * @param regionPath the region path of the Geode region
+ * @param func the function that generate region key from RDD element T
+ * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+ * @tparam K the key type of the Geode region
+ * @tparam V the value type of the Geode region
+ * @return RDD[ T, Option[V] ]
+ */
+ def outerJoinGeodeRegion[K, V](regionPath: String, func: T => K,
+ connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[T, K, V] = {
+ new GeodeOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf)
+ }
+
+ /** This version of outerJoinGeodeRegion(...) is just for Java API. */
+ private[connector] def outerJoinGeodeRegion[K, V](
+ regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[T, K, V] = {
+ outerJoinGeodeRegion(regionPath, func.call _, connConf)
+ }
+
+ private[connector] def defaultConnectionConf: GeodeConnectionConf =
+ GeodeConnectionConf(rdd.sparkContext.getConf)
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
new file mode 100644
index 0000000..83aab7a
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.oql.{OQLRelation, QueryRDD}
+import org.apache.spark.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Provide Geode OQL specific functions
+ */
+class GeodeSQLContextFunctions(@transient sqlContext: SQLContext) extends Serializable with Logging {
+
+ /**
+ * Expose a Geode OQL query result as a DataFrame
+ * @param query the OQL query string.
+ */
+ def geodeOQL(
+ query: String,
+ connConf: GeodeConnectionConf = GeodeConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = {
+ logInfo(s"OQL query = $query")
+ val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf)
+ sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext))
+ }
+
+ private[connector] def defaultConnectionConf: GeodeConnectionConf =
+ GeodeConnectionConf(sqlContext.sparkContext.getConf)
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
new file mode 100644
index 0000000..617cb33
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD
+import org.apache.spark.SparkContext
+
+import scala.reflect.ClassTag
+
+/** Provides Geode specific methods on `SparkContext` */
+class GeodeSparkContextFunctions(@transient sc: SparkContext) extends Serializable {
+
+ /**
+ * Expose a Geode region as a GeodeRDD
+ * @param regionPath the full path of the region
+ * @param connConf the GeodeConnectionConf that can be used to access the region
+ * @param opConf use this to specify preferred partitioner
+ * and its parameters. The implementation will use it if it's applicable
+ */
+ def geodeRegion[K: ClassTag, V: ClassTag] (
+ regionPath: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf),
+ opConf: Map[String, String] = Map.empty): GeodeRegionRDD[K, V] =
+ GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
new file mode 100644
index 0000000..52f9961
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import java.net.InetAddress
+
+import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut}
+import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService}
+import com.gemstone.gemfire.cache.query.Query
+import com.gemstone.gemfire.cache.{Region, RegionService}
+import com.gemstone.gemfire.internal.cache.execute.InternalExecution
+import io.pivotal.geode.spark.connector.internal.oql.QueryResultCollector
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition
+import org.apache.spark.{SparkEnv, Logging}
+import io.pivotal.geode.spark.connector.GeodeConnection
+import io.pivotal.geode.spark.connector.internal.geodefunctions._
+import java.util.{Set => JSet, List => JList }
+
+/**
+ * Default GeodeConnection implementation. The instance of this should be
+ * created by DefaultGeodeConnectionFactory
+ * @param locators pairs of host/port of locators
+ * @param gemFireProps The initial geode properties to be used.
+ */
+private[connector] class DefaultGeodeConnection (
+ locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty)
+ extends GeodeConnection with Logging {
+
+ private val clientCache = initClientCache()
+
+ /** Register Geode functions to the Geode cluster */
+ FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance())
+ FunctionService.registerFunction(RetrieveRegionFunction.getInstance())
+
+ private def initClientCache() : ClientCache = {
+ try {
+ val ccf = getClientCacheFactory
+ ccf.create()
+ } catch {
+ case e: Exception =>
+ logError(s"""Failed to init ClientCache, locators=${locators.mkString(",")}, Error: $e""")
+ throw new RuntimeException(e)
+ }
+ }
+
+ private def getClientCacheFactory: ClientCacheFactory = {
+ import io.pivotal.geode.spark.connector.map2Properties
+ val ccf = new ClientCacheFactory(gemFireProps)
+ ccf.setPoolReadTimeout(30000)
+ val servers = LocatorHelper.getAllGeodeServers(locators)
+ if (servers.isDefined && servers.get.size > 0) {
+ val sparkIp = System.getenv("SPARK_LOCAL_IP")
+ val hostName = if (sparkIp != null) InetAddress.getByName(sparkIp).getCanonicalHostName
+ else InetAddress.getLocalHost.getCanonicalHostName
+ val executorId = SparkEnv.get.executorId
+ val pickedServers = LocatorHelper.pickPreferredGeodeServers(servers.get, hostName, executorId)
+ logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, host=$hostName executor=$executorId props=$gemFireProps""")
+ logDebug(s"""Init ClientCache: all-severs=${pickedServers.mkString(",")}""")
+ pickedServers.foreach{ case (host, port) => ccf.addPoolServer(host, port) }
+ } else {
+ logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""")
+ locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) }
+ }
+ ccf
+ }
+
+ /** close the clientCache */
+ override def close(): Unit =
+ if (! clientCache.isClosed) clientCache.close()
+
+ /** ----------------------------------------- */
+ /** implementation of GeodeConnection trait */
+ /** ----------------------------------------- */
+
+ override def getQuery(queryString: String): Query =
+ clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString)
+
+ override def validateRegion[K, V](regionPath: String): Unit = {
+ val md = getRegionMetadata[K, V](regionPath)
+ if (! md.isDefined) throw new RuntimeException(s"The region named $regionPath was not found")
+ }
+
+ def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = {
+ import scala.collection.JavaConversions.setAsJavaSet
+ val region = getRegionProxy[K, V](regionPath)
+ val set0: JSet[Integer] = Set[Integer](0)
+ val exec = FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0)
+ exec.setWaitOnExceptionFlag(true)
+ try {
+ val collector = exec.execute(RetrieveRegionMetadataFunction.ID)
+ val r = collector.getResult.asInstanceOf[JList[RegionMetadata]]
+ logDebug(r.get(0).toString)
+ Some(r.get(0))
+ } catch {
+ case e: FunctionException =>
+ if (e.getMessage.contains(s"The region named /$regionPath was not found")) None
+ else throw e
+ }
+ }
+
+ def getRegionProxy[K, V](regionPath: String): Region[K, V] = {
+ val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
+ if (region1 != null) region1
+ else DefaultGeodeConnection.regionLock.synchronized {
+ val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
+ if (region2 != null) region2
+ else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath)
+ }
+ }
+
+ override def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)] = {
+ val region = getRegionProxy[K, V](regionPath)
+ val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", ${split.index})"""
+ val args : Array[String] = Array[String](whereClause.getOrElse(""), desc)
+ val collector = new StructStreamingResultCollector(desc)
+ // RetrieveRegionResultCollector[(K, V)]
+ import scala.collection.JavaConversions.setAsJavaSet
+ val exec = FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution]
+ .withBucketFilter(split.bucketSet.map(Integer.valueOf))
+ exec.setWaitOnExceptionFlag(true)
+ exec.execute(RetrieveRegionFunction.ID)
+ collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], objs(1).asInstanceOf[V])}
+ }
+
+ override def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String) = {
+ import scala.collection.JavaConversions.setAsJavaSet
+ FunctionService.registerFunction(QueryFunction.getInstance())
+ val collector = new QueryResultCollector
+ val region = getRegionProxy(regionPath)
+ val args: Array[String] = Array[String](queryString, bucketSet.toString)
+ val exec = FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution]
+ .withBucketFilter(bucketSet.map(Integer.valueOf))
+ .withArgs(args)
+ exec.execute(QueryFunction.ID)
+ collector.getResult
+ }
+}
+
+private[connector] object DefaultGeodeConnection {
+ /** a lock object only used by getRegionProxy...() */
+ private val regionLock = new Object
+}
+
+/** The purpose of this class is making unit test DefaultGeodeConnectionManager easier */
+class DefaultGeodeConnectionFactory {
+
+ def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) =
+ new DefaultGeodeConnection(locators, gemFireProps)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
new file mode 100644
index 0000000..eb67cda
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf, GeodeConnectionManager}
+
+import scala.collection.mutable
+
+/**
+ * Default implementation of GeodeConnectionFactory
+ */
+class DefaultGeodeConnectionManager extends GeodeConnectionManager {
+
+ def getConnection(connConf: GeodeConnectionConf): GeodeConnection =
+ DefaultGeodeConnectionManager.getConnection(connConf)
+
+ def closeConnection(connConf: GeodeConnectionConf): Unit =
+ DefaultGeodeConnectionManager.closeConnection(connConf)
+
+}
+
+object DefaultGeodeConnectionManager {
+
+ /** connection cache, keyed by host:port pair */
+ private[connector] val connections = mutable.Map[(String, Int), GeodeConnection]()
+
+ /**
+ * use locator host:port pair to lookup cached connection. create new connection
+ * and add it to the cache `connections` if it does not exist.
+ */
+ def getConnection(connConf: GeodeConnectionConf)
+ (implicit factory: DefaultGeodeConnectionFactory = new DefaultGeodeConnectionFactory): GeodeConnection = {
+
+ def getCachedConnection(locators: Seq[(String, Int)]): GeodeConnection = {
+ val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null)
+ if (conns.nonEmpty) conns(0) else null
+ }
+
+ val conn1 = getCachedConnection(connConf.locators)
+ if (conn1 != null) conn1
+ else connections.synchronized {
+ val conn2 = getCachedConnection(connConf.locators)
+ if (conn2 != null) conn2
+ else {
+ val conn3 = factory.newConnection(connConf.locators, connConf.geodeProps)
+ connConf.locators.foreach(pair => connections += (pair -> conn3))
+ conn3
+ }
+ }
+ }
+
+ /**
+ * Close the connection and remove it from connection cache.
+ * Note: multiple entries may share the same connection, all those entries are removed.
+ */
+ def closeConnection(connConf: GeodeConnectionConf): Unit = {
+ val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null)
+ if (conns.nonEmpty) connections.synchronized {
+ conns(0).close()
+ connections.retain((k,v) => v != conns(0))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
new file mode 100644
index 0000000..71fed52
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import java.net.InetSocketAddress
+import java.util.{ArrayList => JArrayList}
+
+import com.gemstone.gemfire.cache.client.internal.locator.{GetAllServersResponse, GetAllServersRequest}
+import com.gemstone.gemfire.distributed.internal.ServerLocation
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient
+import org.apache.spark.Logging
+
+import scala.util.{Failure, Success, Try}
+
+
+object LocatorHelper extends Logging {
+
+ /** valid locator strings are: host[port] and host:port */
+ final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r
+ final val LocatorPattern2 = """([\w-_]+(\.[\w-_]+)*):([0-9]{2,5})""".r
+
+ /** convert single locator string to Try[(host, port)] */
+ def locatorStr2HostPortPair(locatorStr: String): Try[(String, Int)] =
+ locatorStr match {
+ case LocatorPattern1(host, domain, port) => Success((host, port.toInt))
+ case LocatorPattern2(host, domain, port) => Success((host, port.toInt))
+ case _ => Failure(new Exception(s"invalid locator: $locatorStr"))
+ }
+
+ /**
+ * Parse locator strings and returns Seq of (hostname, port) pair.
+ * Valid locator string are one or more "host[port]" and/or "host:port"
+ * separated by `,`. For example:
+ * host1.mydomain.com[8888],host2.mydomain.com[8889]
+ * host1.mydomain.com:8888,host2.mydomain.com:8889
+ */
+ def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] =
+ locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get)
+
+
+ /**
+ * Return the list of live Geode servers for the given locators.
+ * @param locators locators for the given Geode cluster
+ * @param serverGroup optional server group name, default is "" (empty string)
+ */
+ def getAllGeodeServers(locators: Seq[(String, Int)], serverGroup: String = ""): Option[Seq[(String, Int)]] = {
+ var result: Option[Seq[(String, Int)]] = None
+ locators.find { case (host, port) =>
+ try {
+ val addr = new InetSocketAddress(host, port)
+ val req = new GetAllServersRequest(serverGroup)
+ val res = TcpClient.requestToServer(addr.getAddress, addr.getPort, req, 2000)
+ if (res != null) {
+ import scala.collection.JavaConverters._
+ val servers = res.asInstanceOf[GetAllServersResponse].getServers.asInstanceOf[JArrayList[ServerLocation]]
+ if (servers.size > 0)
+ result = Some(servers.asScala.map(e => (e.getHostName, e.getPort)))
+ }
+ } catch { case e: Exception => logWarning("getAllGeodeServers error", e)
+ }
+ result.isDefined
+ }
+ result
+ }
+
+ /**
+ * Pick up at most 3 preferred servers from all available servers based on
+ * host name and Spark executor id.
+ *
+ * This method is used by DefaultGeodeConnection to create ClientCache. Usually
+ * one server is enough to initialize ClientCacheFactory, but this provides two
+ * backup servers in case of the 1st server can't be connected.
+ *
+ * @param servers all available servers in the form of (hostname, port) pairs
+ * @param hostName the host name of the Spark executor
+ * @param executorId the Spark executor Id, such as "<driver>", "0", "1", ...
+ * @return Seq[(hostname, port)] of preferred servers
+ */
+ def pickPreferredGeodeServers(
+ servers: Seq[(String, Int)], hostName: String, executorId: String): Seq[(String, Int)] = {
+
+ // pick up `length` items form the Seq starts at the `start` position.
+ // The Seq is treated as a ring, so at most `Seq.size` items can be picked
+ def circularTake[T](seq: Seq[T], start: Int, length: Int): Seq[T] = {
+ val size = math.min(seq.size, length)
+ (start until start + size).map(x => seq(x % seq.size))
+ }
+
+ // map executor id to int: "<driver>" (or non-number string) to 0, and "n" to n + 1
+ val id = try { executorId.toInt + 1 } catch { case e: NumberFormatException => 0 }
+
+ // algorithm:
+ // 1. sort server list
+ // 2. split sorted server list into 3 sub-lists a, b, and c:
+ // list-a: servers on the given host
+ // list-b: servers that are in front of list-a on the sorted server list
+ // list-c: servers that are behind list-a on the sorted server list
+ // then rotate list-a based on executor id, then create new server list:
+ // modified list-a ++ list-c ++ list-b
+ // 3. if there's no server on the given host, then create new server list
+ // by rotating sorted server list based on executor id.
+ // 4. take up to 3 servers from the new server list
+ val sortedServers = servers.sorted
+ val firstIdx = sortedServers.indexWhere(p => p._1 == hostName)
+ val lastIdx = if (firstIdx < 0) -1 else sortedServers.lastIndexWhere(p => p._1 == hostName)
+
+ if (firstIdx < 0) { // no local server
+ circularTake(sortedServers, id, 3)
+ } else {
+ val (seq1, seq2) = sortedServers.splitAt(firstIdx)
+ val seq = if (firstIdx == lastIdx) { // one local server
+ seq2 ++ seq1
+ } else { // multiple local server
+ val (seq3, seq4) = seq2.splitAt(lastIdx - firstIdx + 1)
+ val seq3b = if (id % seq3.size == 0) seq3 else circularTake(seq3, id, seq3.size)
+ seq3b ++ seq4 ++ seq1
+ }
+ circularTake(seq, 0, 3)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala
new file mode 100644
index 0000000..a8666fc
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions
+
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
+import com.gemstone.gemfire.DataSerializer
+import com.gemstone.gemfire.cache.execute.ResultCollector
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl
+import com.gemstone.gemfire.cache.query.types.StructType
+import com.gemstone.gemfire.distributed.DistributedMember
+import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput}
+import io.pivotal.geode.spark.connector.internal.geodefunctions.StructStreamingResultSender.
+ {TYPE_CHUNK, DATA_CHUNK, ERROR_CHUNK, SER_DATA, UNSER_DATA, BYTEARR_DATA}
+
+/**
+ * StructStreamingResultCollector and StructStreamingResultSender are paired
+ * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct`
+ * from Geode server to Spark Connector (the client of Geode server)
+ * in streaming, i.e., while sender sending the result, the collector can
+ * start processing the arrived result without waiting for full result to
+ * become available.
+ */
+class StructStreamingResultCollector(desc: String) extends ResultCollector[Array[Byte], Iterator[Array[Object]]] {
+
+ /** the constructor that provide default `desc` (description) */
+ def this() = this("StructStreamingResultCollector")
+
+ private val queue: BlockingQueue[Array[Byte]] = new LinkedBlockingQueue[Array[Byte]]()
+ var structType: StructType = null
+
+ /** ------------------------------------------ */
+ /** ResultCollector interface implementations */
+ /** ------------------------------------------ */
+
+ override def getResult: Iterator[Array[Object]] = resultIterator
+
+ override def getResult(timeout: Long, unit: TimeUnit): Iterator[Array[Object]] =
+ throw new UnsupportedOperationException()
+
+ /** addResult add non-empty byte array (chunk) to the queue */
+ override def addResult(memberID: DistributedMember, chunk: Array[Byte]): Unit =
+ if (chunk != null && chunk.size > 1) {
+ this.queue.add(chunk)
+ // println(s"""$desc receive from $memberID: ${chunk.mkString(" ")}""")
+ }
+
+ /** endResults add special `Array.empty` to the queue as marker of end of data */
+ override def endResults(): Unit = this.queue.add(Array.empty)
+
+ override def clearResults(): Unit = this.queue.clear()
+
+ /** ------------------------------------------ */
+ /** Internal methods */
+ /** ------------------------------------------ */
+
+ def getResultType: StructType = {
+ // trigger lazy resultIterator initialization if necessary
+ if (structType == null) resultIterator.hasNext
+ structType
+ }
+
+ /**
+ * Note: The data is sent in chunks, and each chunk contains multiple
+ * records. So the result iterator is an iterator (I) of iterator (II),
+ * i.e., go through each chunk (iterator (I)), and for each chunk, go
+ * through each record (iterator (II)).
+ */
+ private lazy val resultIterator = new Iterator[Array[Object]] {
+
+ private var currentIterator: Iterator[Array[Object]] = nextIterator()
+
+ override def hasNext: Boolean = {
+ if (!currentIterator.hasNext && currentIterator != Iterator.empty) currentIterator = nextIterator()
+ currentIterator.hasNext
+ }
+
+ /** Note: make sure call `hasNext` first to adjust `currentIterator` */
+ override def next(): Array[Object] = currentIterator.next()
+ }
+
+ /** get the iterator for the next chunk of data */
+ private def nextIterator(): Iterator[Array[Object]] = {
+ val chunk: Array[Byte] = queue.take
+ if (chunk.isEmpty) {
+ Iterator.empty
+ } else {
+ val input = new ByteArrayDataInput()
+ input.initialize(chunk, Version.CURRENT)
+ val chunkType = input.readByte()
+ // println(s"chunk type $chunkType")
+ chunkType match {
+ case TYPE_CHUNK =>
+ if (structType == null)
+ structType = DataSerializer.readObject(input).asInstanceOf[StructTypeImpl]
+ nextIterator()
+ case DATA_CHUNK =>
+ // require(structType != null && structType.getFieldNames.length > 0)
+ if (structType == null) structType = StructStreamingResultSender.KeyValueType
+ chunkToIterator(input, structType.getFieldNames.length)
+ case ERROR_CHUNK =>
+ val error = DataSerializer.readObject(input).asInstanceOf[Exception]
+ errorPropagationIterator(error)
+ case _ => throw new RuntimeException(s"unknown chunk type: $chunkType")
+ }
+ }
+ }
+
+ /** create a iterator that propagate sender's exception */
+ private def errorPropagationIterator(ex: Exception) = new Iterator[Array[Object]] {
+ val re = new RuntimeException(ex)
+ override def hasNext: Boolean = throw re
+ override def next(): Array[Object] = throw re
+ }
+
+ /** convert a chunk of data to an iterator */
+ private def chunkToIterator(input: ByteArrayDataInput, rowSize: Int) = new Iterator[Array[Object]] {
+ override def hasNext: Boolean = input.available() > 0
+ val tmpInput = new ByteArrayDataInput()
+ override def next(): Array[Object] =
+ (0 until rowSize).map { ignore =>
+ val b = input.readByte()
+ b match {
+ case SER_DATA =>
+ val arr: Array[Byte] = DataSerializer.readByteArray(input)
+ tmpInput.initialize(arr, Version.CURRENT)
+ DataSerializer.readObject(tmpInput).asInstanceOf[Object]
+ case UNSER_DATA =>
+ DataSerializer.readObject(input).asInstanceOf[Object]
+ case BYTEARR_DATA =>
+ DataSerializer.readByteArray(input).asInstanceOf[Object]
+ case _ =>
+ throw new RuntimeException(s"unknown data type $b")
+ }
+ }.toArray
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala
new file mode 100644
index 0000000..3f6dfad
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import scala.util.parsing.combinator.RegexParsers
+
+class QueryParser extends RegexParsers {
+
+ def query: Parser[String] = opt(rep(IMPORT ~ PACKAGE)) ~> select ~> opt(distinct) ~> projection ~> from ~> regions <~ opt(where ~ filter) ^^ {
+ _.toString
+ }
+
+ val IMPORT: Parser[String] = "[Ii][Mm][Pp][Oo][Rr][Tt]".r
+
+ val select: Parser[String] = "[Ss][Ee][Ll][Ee][Cc][Tt]".r
+
+ val distinct: Parser[String] = "[Dd][Ii][Ss][Tt][Ii][Nn][Cc][Tt]".r
+
+ val from: Parser[String] = "[Ff][Rr][Oo][Mm]".r
+
+ val where: Parser[String] = "[Ww][Hh][Ee][Rr][Ee]".r
+
+ def PACKAGE: Parser[String] = """[\w.]+""".r
+
+ def projection: Parser[String] = "*" | repsep("""["\w]+[.\w"]*""".r, ",") ^^ {
+ _.toString
+ }
+
+ def regions: Parser[String] = repsep(region <~ opt(alias), ",") ^^ {
+ _.toString
+ }
+
+ def region: Parser[String] = """/[\w.]+[/[\w.]+]*""".r | """[\w]+[.\w]*""".r
+
+ def alias: Parser[String] = not(where) ~> """[\w]+""".r
+
+ def filter: Parser[String] = """[\w.]+[[\s]+[<>=.'\w]+]*""".r
+}
+
+object QueryParser extends QueryParser {
+
+ def parseOQL(expression: String) = parseAll(query, expression)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala
new file mode 100644
index 0000000..474aa6a
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDPartition, ServerSplitsPartitioner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{TaskContext, SparkContext, Partition}
+import scala.reflect.ClassTag
+
+/**
+ * An RDD that provides the functionality that read the OQL query result
+ *
+ * @param sc The SparkContext this RDD is associated with
+ * @param queryString The OQL query string
+ * @param connConf The GeodeConnectionConf that provide the GeodeConnection
+ */
+class QueryRDD[T](@transient sc: SparkContext,
+ queryString: String,
+ connConf: GeodeConnectionConf)
+ (implicit ct: ClassTag[T])
+ extends RDD[T](sc, Seq.empty) {
+
+ override def getPartitions: Array[Partition] = {
+ val conn = connConf.getConnection
+ val regionPath = getRegionPathFromQuery(queryString)
+ val md = conn.getRegionMetadata(regionPath)
+ md match {
+ case Some(metadata) =>
+ if (metadata.isPartitioned) {
+ val splits = ServerSplitsPartitioner.partitions(conn, metadata, Map.empty)
+ logInfo(s"QueryRDD.getPartitions():isPartitioned=true, partitions=${splits.mkString(",")}")
+ splits
+ }
+ else {
+ logInfo(s"QueryRDD.getPartitions():isPartitioned=false")
+ Array[Partition](new GeodeRDDPartition(0, Set.empty))
+
+ }
+ case None => throw new RuntimeException(s"Region $regionPath metadata was not found.")
+ }
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ val buckets = split.asInstanceOf[GeodeRDDPartition].bucketSet
+ val regionPath = getRegionPathFromQuery(queryString)
+ val result = connConf.getConnection.executeQuery(regionPath, buckets, queryString)
+ result match {
+ case it: Iterator[T] =>
+ logInfo(s"QueryRDD.compute():query=$queryString, partition=$split")
+ it
+ case _ =>
+ throw new RuntimeException("Unexpected OQL result: " + result.toString)
+ }
+ }
+
+ private def getRegionPathFromQuery(queryString: String): String = {
+ val r = QueryParser.parseOQL(queryString).get
+ r match {
+ case r: String =>
+ val start = r.indexOf("/") + 1
+ var end = r.indexOf(")")
+ if (r.indexOf(".") > 0) end = math.min(r.indexOf("."), end)
+ if (r.indexOf(",") > 0) end = math.min(r.indexOf(","), end)
+ val regionPath = r.substring(start, end)
+ regionPath
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala
new file mode 100644
index 0000000..bedc58d
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import com.gemstone.gemfire.DataSerializer
+import com.gemstone.gemfire.cache.execute.ResultCollector
+import com.gemstone.gemfire.distributed.DistributedMember
+import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput}
+
+class QueryResultCollector extends ResultCollector[Array[Byte], Iterator[Object]]{
+
+ private val queue = new LinkedBlockingDeque[Array[Byte]]()
+
+ override def getResult = resultIterator
+
+ override def getResult(timeout: Long, unit: TimeUnit) = throw new UnsupportedOperationException
+
+ override def addResult(memberID: DistributedMember , chunk: Array[Byte]) =
+ if (chunk != null && chunk.size > 0) {
+ queue.add(chunk)
+ }
+
+ override def endResults = queue.add(Array.empty)
+
+
+ override def clearResults = queue.clear
+
+ private lazy val resultIterator = new Iterator[Object] {
+ private var currentIterator = nextIterator
+ def hasNext = {
+ if (!currentIterator.hasNext && currentIterator != Iterator.empty)
+ currentIterator = nextIterator
+ currentIterator.hasNext
+ }
+ def next = currentIterator.next
+ }
+
+ private def nextIterator: Iterator[Object] = {
+ val chunk = queue.take
+ if (chunk.isEmpty) {
+ Iterator.empty
+ }
+ else {
+ val input = new ByteArrayDataInput
+ input.initialize(chunk, Version.CURRENT)
+ new Iterator[Object] {
+ override def hasNext: Boolean = input.available() > 0
+ override def next: Object = DataSerializer.readObject(input).asInstanceOf[Object]
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala
new file mode 100644
index 0000000..6a1611c
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+
+import scala.tools.nsc.backend.icode.analysis.DataFlowAnalysis
+
+case class OQLRelation[T](queryRDD: QueryRDD[T])(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {
+
+ override def schema: StructType = new SchemaBuilder(queryRDD).toSparkSchema()
+
+ override def buildScan(): RDD[Row] = new RowBuilder(queryRDD).toRowRDD()
+
+}
+
+object RDDConverter {
+
+ def queryRDDToDataFrame[T](queryRDD: QueryRDD[T], sqlContext: SQLContext): DataFrame = {
+ sqlContext.baseRelationToDataFrame(OQLRelation(queryRDD)(sqlContext))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala
new file mode 100644
index 0000000..e54411c
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import com.gemstone.gemfire.cache.query.internal.StructImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+
+class RowBuilder[T](queryRDD: QueryRDD[T]) {
+
+ /**
+ * Convert QueryRDD to RDD of Row
+ * @return RDD of Rows
+ */
+ def toRowRDD(): RDD[Row] = {
+ val rowRDD = queryRDD.map(row => {
+ row match {
+ case si: StructImpl => Row.fromSeq(si.getFieldValues)
+ case obj: Object => Row.fromSeq(Seq(obj))
+ }
+ })
+ rowRDD
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala
new file mode 100644
index 0000000..3ca20b7
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import com.gemstone.gemfire.cache.query.internal.StructImpl
+import org.apache.spark.sql.types._
+import scala.collection.mutable.ListBuffer
+import org.apache.spark.Logging
+
+class SchemaBuilder[T](queryRDD: QueryRDD[T]) extends Logging {
+
+ val nullStructType = StructType(Nil)
+
+ val typeMap:Map[Class[_], DataType] = Map(
+ (classOf[java.lang.String], StringType),
+ (classOf[java.lang.Integer], IntegerType),
+ (classOf[java.lang.Short], ShortType),
+ (classOf[java.lang.Long], LongType),
+ (classOf[java.lang.Double], DoubleType),
+ (classOf[java.lang.Float], FloatType),
+ (classOf[java.lang.Boolean], BooleanType),
+ (classOf[java.lang.Byte], ByteType),
+ (classOf[java.util.Date], DateType),
+ (classOf[java.lang.Object], nullStructType)
+ )
+
+ /**
+ * Analyse QueryRDD to get the Spark schema
+ * @return The schema represented by Spark StructType
+ */
+ def toSparkSchema(): StructType = {
+ val row = queryRDD.first()
+ val tpe = row match {
+ case r: StructImpl => constructFromStruct(r)
+ case null => StructType(StructField("col1", NullType) :: Nil)
+ case default =>
+ val value = typeMap.getOrElse(default.getClass(), nullStructType)
+ StructType(StructField("col1", value) :: Nil)
+ }
+ logInfo(s"Schema: $tpe")
+ tpe
+ }
+
+ def constructFromStruct(r:StructImpl) = {
+ val names = r.getFieldNames
+ val values = r.getFieldValues
+ val lb = new ListBuffer[StructField]()
+ for (i <- 0 until names.length) {
+ val name = names(i)
+ val value = values(i)
+ val dataType = value match {
+ case null => NullType
+ case default => typeMap.getOrElse(default.getClass, nullStructType)
+ }
+ lb += StructField(name, dataType)
+ }
+ StructType(lb.toSeq)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala
new file mode 100644
index 0000000..37dec42
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import com.esotericsoftware.kryo.{Kryo, Serializer}
+import com.esotericsoftware.kryo.io.{Output, Input}
+import com.gemstone.gemfire.cache.query.QueryService
+import com.gemstone.gemfire.cache.query.internal.Undefined
+
+/**
+ * This is the customized serializer to serialize QueryService.UNDEFINED,
+ * i.e. com.gemstone.gemfire.cache.query.internal.Undefined, in order to
+ * guarantee the singleton Undefined after its deserialization within Spark.
+ */
+class UndefinedSerializer extends Serializer[Undefined] {
+
+ def write(kryo: Kryo, output: Output, u: Undefined) {
+ //Only serialize a byte for Undefined
+ output.writeByte(u.getDSFID)
+ }
+
+ def read (kryo: Kryo, input: Input, tpe: Class[Undefined]): Undefined = {
+ //Read DSFID of Undefined
+ input.readByte()
+ QueryService.UNDEFINED match {
+ case null => new Undefined
+ case _ =>
+ //Avoid calling Undefined constructor again.
+ QueryService.UNDEFINED.asInstanceOf[Undefined]
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
new file mode 100644
index 0000000..e9dd658
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.rdd
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.rdd.RDD
+import scala.collection.JavaConversions._
+
+/**
+ * An `RDD[T, V]` that will represent the result of a join between `left` RDD[T]
+ * and the specified Geode Region[K, V].
+ */
+class GeodeJoinRDD[T, K, V] private[connector]
+ ( left: RDD[T],
+ func: T => K,
+ val regionPath: String,
+ val connConf: GeodeConnectionConf
+ ) extends RDD[(T, V)](left.context, left.dependencies) {
+
+ /** validate region existence when GeodeRDD object is created */
+ validate()
+
+ /** Validate region, and make sure it exists. */
+ private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath)
+
+ override protected def getPartitions: Array[Partition] = left.partitions
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(T, V)] = {
+ val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
+ if (func == null) computeWithoutFunc(split, context, region)
+ else computeWithFunc(split, context, region)
+ }
+
+ /** T is (K, V1) since there's no map function `func` */
+ private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = {
+ val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]]
+ val leftKeys = leftPairs.map { case (k, v) => k}.toSet
+ // Note: get all will return (key, null) for non-exist entry, so remove those entries
+ val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
+ leftPairs.filter{case (k, v) => rightPairs.contains(k)}
+ .map {case (k, v) => ((k, v).asInstanceOf[T], rightPairs.get(k).get)}.toIterator
+ }
+
+ private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = {
+ val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
+ val leftKeys = leftPairs.map { case (t, k) => k}.toSet
+ // Note: get all will return (key, null) for non-exist entry, so remove those entries
+ val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
+ leftPairs.filter { case (t, k) => rightPairs.contains(k)}.map {case (t, k) => (t, rightPairs.get(k).get)}.toIterator
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
new file mode 100644
index 0000000..3d61d47
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.rdd
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.rdd.RDD
+import scala.collection.JavaConversions._
+
+/**
+ * An `RDD[ T, Option[V] ]` that represents the result of a left outer join
+ * between `left` RDD[T] and the specified Geode Region[K, V].
+ */
+class GeodeOuterJoinRDD[T, K, V] private[connector]
+ ( left: RDD[T],
+ func: T => K,
+ val regionPath: String,
+ val connConf: GeodeConnectionConf
+ ) extends RDD[(T, Option[V])](left.context, left.dependencies) {
+
+ /** validate region existence when GeodeRDD object is created */
+ validate()
+
+ /** Validate region, and make sure it exists. */
+ private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath)
+
+ override protected def getPartitions: Array[Partition] = left.partitions
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(T, Option[V])] = {
+ val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
+ if (func == null) computeWithoutFunc(split, context, region)
+ else computeWithFunc(split, context, region)
+ }
+
+ /** T is (K1, V1), and K1 and K are the same type since there's no map function `func` */
+ private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = {
+ val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]]
+ val leftKeys = leftPairs.map { case (k, v) => k}.toSet
+ // Note: get all will return (key, null) for non-exist entry
+ val rightPairs = region.getAll(leftKeys)
+ // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option
+ leftPairs.map{ case (k, v) => ((k, v).asInstanceOf[T], Option(rightPairs.get(k))) }.toIterator
+ }
+
+ private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = {
+ val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
+ val leftKeys = leftPairs.map { case (t, k) => k}.toSet
+ // Note: get all will return (key, null) for non-exist entry
+ val rightPairs = region.getAll(leftKeys)
+ // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option
+ leftPairs.map{ case (t, k) => (t, Option(rightPairs.get(k)))}.toIterator
+ }
+}
+