You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/20 23:00:41 UTC
[12/14] incubator-geode git commit: GEODE-37 change package name from
io.pivotal.geode (for ./geode-spark-connector/src/main/scala/io/pivotal)to
org.apache.geode for(to ./geode-spark-connector/src/main/scala/org/apache)
GEODE-37 change package name from io.pivotal.geode (for ./geode-spark-connector/src/main/scala/io/pivotal)to org.apache.geode for(to ./geode-spark-connector/src/main/scala/org/apache)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/106cc60f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/106cc60f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/106cc60f
Branch: refs/heads/develop
Commit: 106cc60fc92ed8e594db44ceeed8d53df43188aa
Parents: f7eaa26
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Tue Sep 20 15:44:10 2016 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Sep 20 16:01:02 2016 -0700
----------------------------------------------------------------------
.../geode/spark/connector/GeodeConnection.scala | 67 --------
.../spark/connector/GeodeConnectionConf.scala | 73 ---------
.../connector/GeodeConnectionManager.scala | 31 ----
.../spark/connector/GeodeFunctionDeployer.scala | 81 ---------
.../spark/connector/GeodeKryoRegistrator.scala | 29 ----
.../spark/connector/GeodePairRDDFunctions.scala | 140 ----------------
.../spark/connector/GeodeRDDFunctions.scala | 120 --------------
.../connector/GeodeSQLContextFunctions.scala | 42 -----
.../connector/GeodeSparkContextFunctions.scala | 39 -----
.../internal/DefaultGeodeConnection.scala | 164 -------------------
.../DefaultGeodeConnectionManager.scala | 77 ---------
.../connector/internal/LocatorHelper.scala | 135 ---------------
.../StructStreamingResultCollector.scala | 152 -----------------
.../connector/internal/oql/QueryParser.scala | 58 -------
.../spark/connector/internal/oql/QueryRDD.scala | 83 ----------
.../internal/oql/QueryResultCollector.scala | 69 --------
.../connector/internal/oql/RDDConverter.scala | 40 -----
.../connector/internal/oql/RowBuilder.scala | 38 -----
.../connector/internal/oql/SchemaBuilder.scala | 73 ---------
.../internal/oql/UndefinedSerializer.scala | 46 ------
.../connector/internal/rdd/GeodeJoinRDD.scala | 67 --------
.../internal/rdd/GeodeOuterJoinRDD.scala | 69 --------
.../internal/rdd/GeodeRDDPartition.scala | 36 ----
.../internal/rdd/GeodeRDDPartitioner.scala | 59 -------
.../internal/rdd/GeodeRDDPartitionerImpl.scala | 89 ----------
.../connector/internal/rdd/GeodeRDDWriter.scala | 82 ----------
.../connector/internal/rdd/GeodeRegionRDD.scala | 138 ----------------
.../connector/javaapi/GeodeJavaRegionRDD.scala | 26 ---
.../spark/connector/javaapi/JavaAPIHelper.scala | 53 ------
.../pivotal/geode/spark/connector/package.scala | 69 --------
.../streaming/GeodeDStreamFunctions.scala | 89 ----------
.../spark/connector/streaming/package.scala | 32 ----
.../geode/spark/connector/GeodeConnection.scala | 67 ++++++++
.../spark/connector/GeodeConnectionConf.scala | 73 +++++++++
.../connector/GeodeConnectionManager.scala | 31 ++++
.../spark/connector/GeodeFunctionDeployer.scala | 81 +++++++++
.../spark/connector/GeodeKryoRegistrator.scala | 29 ++++
.../spark/connector/GeodePairRDDFunctions.scala | 140 ++++++++++++++++
.../spark/connector/GeodeRDDFunctions.scala | 120 ++++++++++++++
.../connector/GeodeSQLContextFunctions.scala | 42 +++++
.../connector/GeodeSparkContextFunctions.scala | 39 +++++
.../internal/DefaultGeodeConnection.scala | 164 +++++++++++++++++++
.../DefaultGeodeConnectionManager.scala | 77 +++++++++
.../connector/internal/LocatorHelper.scala | 135 +++++++++++++++
.../StructStreamingResultCollector.scala | 152 +++++++++++++++++
.../connector/internal/oql/QueryParser.scala | 58 +++++++
.../spark/connector/internal/oql/QueryRDD.scala | 83 ++++++++++
.../internal/oql/QueryResultCollector.scala | 69 ++++++++
.../connector/internal/oql/RDDConverter.scala | 40 +++++
.../connector/internal/oql/RowBuilder.scala | 38 +++++
.../connector/internal/oql/SchemaBuilder.scala | 73 +++++++++
.../internal/oql/UndefinedSerializer.scala | 46 ++++++
.../connector/internal/rdd/GeodeJoinRDD.scala | 67 ++++++++
.../internal/rdd/GeodeOuterJoinRDD.scala | 69 ++++++++
.../internal/rdd/GeodeRDDPartition.scala | 36 ++++
.../internal/rdd/GeodeRDDPartitioner.scala | 59 +++++++
.../internal/rdd/GeodeRDDPartitionerImpl.scala | 89 ++++++++++
.../connector/internal/rdd/GeodeRDDWriter.scala | 82 ++++++++++
.../connector/internal/rdd/GeodeRegionRDD.scala | 138 ++++++++++++++++
.../connector/javaapi/GeodeJavaRegionRDD.scala | 26 +++
.../spark/connector/javaapi/JavaAPIHelper.scala | 53 ++++++
.../apache/geode/spark/connector/package.scala | 69 ++++++++
.../streaming/GeodeDStreamFunctions.scala | 89 ++++++++++
.../spark/connector/streaming/package.scala | 32 ++++
64 files changed, 2366 insertions(+), 2366 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
deleted file mode 100644
index 6c1df67..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import org.apache.geode.cache.execute.ResultCollector
-import org.apache.geode.cache.query.Query
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector.internal.RegionMetadata
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition
-
-
-trait GeodeConnection {
-
- /**
- * Validate region existence and key/value type constraints, throw RuntimeException
- * if region does not exist or key and/or value type do(es) not match.
- * @param regionPath the full path of region
- */
- def validateRegion[K, V](regionPath: String): Unit
-
- /**
- * Get Region proxy for the given region
- * @param regionPath the full path of region
- */
- def getRegionProxy[K, V](regionPath: String): Region[K, V]
-
- /**
- * Retrieve region meta data for the given region.
- * @param regionPath: the full path of the region
- * @return Some[RegionMetadata] if region exists, None otherwise
- */
- def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata]
-
- /**
- * Retrieve region data for the given region and bucket set
- * @param regionPath: the full path of the region
- * @param whereClause: the set of bucket IDs
- * @param split: Geode RDD Partition instance
- */
- def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)]
-
- def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object
- /**
- * Create a geode OQL query
- * @param queryString Geode OQL query string
- */
- def getQuery(queryString: String): Query
-
- /** Close the connection */
- def close(): Unit
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
deleted file mode 100644
index 38d9e07..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import org.apache.spark.SparkConf
-import io.pivotal.geode.spark.connector.internal.{DefaultGeodeConnectionManager, LocatorHelper}
-
-/**
- * Stores configuration of a connection to Geode cluster. It is serializable and can
- * be safely sent over network.
- *
- * @param locators Geode locator host:port pairs, the default is (localhost,10334)
- * @param geodeProps The initial geode properties to be used.
- * @param connectionManager GeodeConnectionFactory instance
- */
-class GeodeConnectionConf(
- val locators: Seq[(String, Int)],
- val geodeProps: Map[String, String] = Map.empty,
- connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager
- ) extends Serializable {
-
- /** require at least 1 pair of (host,port) */
- require(locators.nonEmpty)
-
- def getConnection: GeodeConnection = connectionManager.getConnection(this)
-
-}
-
-object GeodeConnectionConf {
-
- /**
- * create GeodeConnectionConf object based on locator string and optional GeodeConnectionFactory
- * @param locatorStr Geode cluster locator string
- * @param connectionManager GeodeConnection factory
- */
- def apply(locatorStr: String, geodeProps: Map[String, String] = Map.empty)
- (implicit connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager): GeodeConnectionConf = {
- new GeodeConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), geodeProps, connectionManager)
- }
-
- /**
- * create GeodeConnectionConf object based on SparkConf. Note that implicit can
- * be used to control what GeodeConnectionFactory instance to use if desired
- * @param conf a SparkConf instance
- */
- def apply(conf: SparkConf): GeodeConnectionConf = {
- val locatorStr = conf.getOption(GeodeLocatorPropKey).getOrElse(
- throw new RuntimeException(s"SparkConf does not contain property $GeodeLocatorPropKey"))
- // SparkConf only holds properties whose key starts with "spark.", In order to
- // put geode properties in SparkConf, all geode properties are prefixes with
- // "spark.geode.". This prefix was removed before the properties were put in `geodeProp`
- val prefix = "spark.geode."
- val geodeProps = conf.getAll.filter {
- case (k, v) => k.startsWith(prefix) && k != GeodeLocatorPropKey
- }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap
- apply(locatorStr, geodeProps)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
deleted file mode 100644
index bf678f0..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-/**
- * GeodeConnectionFactory provide an common interface that manages Geode
- * connections, and it's serializable. Each factory instance will handle
- * connection instance creation and connection pool management.
- */
-trait GeodeConnectionManager extends Serializable {
-
- /** get connection for the given connector */
- def getConnection(connConf: GeodeConnectionConf): GeodeConnection
-
- /** close the connection */
- def closeConnection(connConf: GeodeConnectionConf): Unit
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
deleted file mode 100644
index 6e93b05..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import java.io.File
-import java.net.URL
-import org.apache.commons.httpclient.methods.PostMethod
-import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity}
-import org.apache.commons.httpclient.HttpClient
-import org.apache.spark.Logging
-
-object GeodeFunctionDeployer {
- def main(args: Array[String]) {
- new GeodeFunctionDeployer(new HttpClient()).commandLineRun(args)
- }
-}
-
-class GeodeFunctionDeployer(val httpClient:HttpClient) extends Logging {
-
- def deploy(host: String, port: Int, jarLocation: String): String =
- deploy(host + ":" + port, jarLocation)
-
- def deploy(host: String, port: Int, jar:File): String =
- deploy(host + ":" + port, jar)
-
- def deploy(jmxHostAndPort: String, jarLocation: String): String =
- deploy(jmxHostAndPort, jarFileHandle(jarLocation))
-
- def deploy(jmxHostAndPort: String, jar: File): String = {
- val urlString = constructURLString(jmxHostAndPort)
- val filePost: PostMethod = new PostMethod(urlString)
- val parts: Array[Part] = new Array[Part](1)
- parts(0) = new FilePart("resources", jar)
- filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams))
- val status: Int = httpClient.executeMethod(filePost)
- "Deployed Jar with status:" + status
- }
-
- private[connector] def constructURLString(jmxHostAndPort: String) =
- "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
-
- private[connector]def jarFileHandle(jarLocation: String) = {
- val f: File = new File(jarLocation)
- if (!f.exists()) {
- val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath
- logInfo(errorMessage)
- throw new RuntimeException(errorMessage)
- }
- f
- }
-
- def commandLineRun(args: Array[String]):Unit = {
- val (hostPort: String, jarFile: String) =
- if (args.length < 2) {
- logInfo("JMX Manager Host and Port (example: localhost:7070):")
- val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in))
- val jmxHostAndPort = bufferedReader.readLine()
- logInfo("Location of geode-functions.jar:")
- val functionJarLocation = bufferedReader.readLine()
- (jmxHostAndPort, functionJarLocation)
- } else {
- (args(0), args(1))
- }
- val status = deploy(hostPort, jarFile)
- logInfo(status)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
deleted file mode 100644
index 0bf7df5..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import com.esotericsoftware.kryo.Kryo
-import io.pivotal.geode.spark.connector.internal.oql.UndefinedSerializer
-import org.apache.spark.serializer.KryoRegistrator
-import org.apache.geode.cache.query.internal.Undefined
-
-class GeodeKryoRegistrator extends KryoRegistrator{
-
- override def registerClasses(kyro: Kryo): Unit = {
- kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer])
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
deleted file mode 100644
index ba5d2df..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodePairRDDWriter}
-import org.apache.spark.Logging
-import org.apache.spark.api.java.function.Function
-import org.apache.spark.rdd.RDD
-
-/**
- * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion.
- * Import `io.pivotal.geode.spark.connector._` at the top of your program to
- * use these functions.
- */
-class GeodePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging {
-
- /**
- * Save the RDD of pairs to Geode key-value store without any conversion
- * @param regionPath the full path of region that the RDD is stored
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the optional parameters for this operation
- */
- def saveToGeode(
- regionPath: String,
- connConf: GeodeConnectionConf = defaultConnectionConf,
- opConf: Map[String, String] = Map.empty): Unit = {
- connConf.getConnection.validateRegion[K, V](regionPath)
- if (log.isDebugEnabled)
- logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""")
- else
- logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
- val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf)
- rdd.sparkContext.runJob(rdd, writer.write _)
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this`
- * RDD and the Geode `Region[K, V2]`. Each pair of elements will be returned
- * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the
- * Geode region.
- *
- *@param regionPath the region path of the Geode region
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @tparam K2 the key type of the Geode region
- * @tparam V2 the value type of the Geode region
- * @return RDD[T, V]
- */
- def joinGeodeRegion[K2 <: K, V2](
- regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K, V2] = {
- new GeodeJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this` RDD
- * and the Geode `Region[K2, V2]`. The join key from RDD element is generated by
- * `func(K, V) => K2`, and the key from the Geode region is jus the key of the
- * key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
- * where (k, v) is in `this` RDD and (k2, v2) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @tparam K2 the key type of the Geode region
- * @tparam V2 the value type of the Geode region
- * @return RDD[(K, V), V2]
- */
- def joinGeodeRegion[K2, V2](
- regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K2, V2] =
- new GeodeJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
-
- /** This version of joinGeodeRegion(...) is just for Java API. */
- private[connector] def joinGeodeRegion[K2, V2](
- regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = {
- new GeodeJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
- }
-
- /**
- * Perform a left outer join of `this` RDD and the Geode `Region[K, V2]`.
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key k.
- *
- * @param regionPath the region path of the Geode region
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @tparam K2 the key type of the Geode region
- * @tparam V2 the value type of the Geode region
- * @return RDD[ (K, V), Option[V] ]
- */
- def outerJoinGeodeRegion[K2 <: K, V2](
- regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K, V2] = {
- new GeodeOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
- }
-
- /**
- * Perform a left outer join of `this` RDD and the Geode `Region[K2, V2]`.
- * The join key from RDD element is generated by `func(K, V) => K2`, and the
- * key from region is jus the key of the key/value pair.
- *
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
- * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
- *
- *@param regionPath the region path of the Geode region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @tparam K2 the key type of the Geode region
- * @tparam V2 the value type of the Geode region
- * @return RDD[ (K, V), Option[V] ]
- */
- def outerJoinGeodeRegion[K2, V2](
- regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
- new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
- }
-
- /** This version of outerJoinGeodeRegion(...) is just for Java API. */
- private[connector] def outerJoinGeodeRegion[K2, V2](
- regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
- new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
- }
-
- private[connector] def defaultConnectionConf: GeodeConnectionConf =
- GeodeConnectionConf(rdd.sparkContext.getConf)
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
deleted file mode 100644
index 2e5c92a..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodeRDDWriter}
-import org.apache.spark.Logging
-import org.apache.spark.api.java.function.{PairFunction, Function}
-import org.apache.spark.rdd.RDD
-
-/**
- * Extra gemFire functions on non-Pair RDDs through an implicit conversion.
- * Import `io.pivotal.geode.spark.connector._` at the top of your program to
- * use these functions.
- */
-class GeodeRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging {
-
- /**
- * Save the non-pair RDD to Geode key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the function that converts elements of RDD to key/value pairs
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @param opConf the optional parameters for this operation
- */
- def saveToGeode[K, V](
- regionPath: String,
- func: T => (K, V),
- connConf: GeodeConnectionConf = defaultConnectionConf,
- opConf: Map[String, String] = Map.empty): Unit = {
- connConf.getConnection.validateRegion[K, V](regionPath)
- if (log.isDebugEnabled)
- logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""")
- else
- logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
- val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf)
- rdd.sparkContext.runJob(rdd, writer.write(func) _)
- }
-
- /** This version of saveToGeode(...) is just for Java API. */
- private[connector] def saveToGeode[K, V](
- regionPath: String,
- func: PairFunction[T, K, V],
- connConf: GeodeConnectionConf,
- opConf: Map[String, String]): Unit = {
- saveToGeode[K, V](regionPath, func.call _, connConf, opConf)
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this` RDD
- * and the Geode `Region[K, V]`. The join key from RDD element is generated by
- * `func(T) => K`, and the key from the Geode region is just the key of the
- * key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a (t, v) tuple,
- * where (t) is in `this` RDD and (k, v) is in the Geode region.
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generate region key from RDD element T
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @tparam K the key type of the Geode region
- * @tparam V the value type of the Geode region
- * @return RDD[T, V]
- */
- def joinGeodeRegion[K, V](regionPath: String, func: T => K,
- connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[T, K, V] = {
- new GeodeJoinRDD[T, K, V](rdd, func, regionPath, connConf)
- }
-
- /** This version of joinGeodeRegion(...) is just for Java API. */
- private[connector] def joinGeodeRegion[K, V](
- regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeJoinRDD[T, K, V] = {
- joinGeodeRegion(regionPath, func.call _, connConf)
- }
-
- /**
- * Perform a left outer join of `this` RDD and the Geode `Region[K, V]`.
- * The join key from RDD element is generated by `func(T) => K`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (t) in `this` RDD, the resulting RDD will either contain
- * all pairs (t, Some(v)) for v in the Geode region, or the pair
- * (t, None) if no element in the Geode region have key `func(t)`
- *
- * @param regionPath the region path of the Geode region
- * @param func the function that generate region key from RDD element T
- * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
- * @tparam K the key type of the Geode region
- * @tparam V the value type of the Geode region
- * @return RDD[ T, Option[V] ]
- */
- def outerJoinGeodeRegion[K, V](regionPath: String, func: T => K,
- connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[T, K, V] = {
- new GeodeOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf)
- }
-
- /** This version of outerJoinGeodeRegion(...) is just for Java API. */
- private[connector] def outerJoinGeodeRegion[K, V](
- regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[T, K, V] = {
- outerJoinGeodeRegion(regionPath, func.call _, connConf)
- }
-
- private[connector] def defaultConnectionConf: GeodeConnectionConf =
- GeodeConnectionConf(rdd.sparkContext.getConf)
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
deleted file mode 100644
index 83aab7a..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import io.pivotal.geode.spark.connector.internal.oql.{OQLRelation, QueryRDD}
-import org.apache.spark.Logging
-import org.apache.spark.sql.{DataFrame, SQLContext}
-
-/**
- * Provide Geode OQL specific functions
- */
-class GeodeSQLContextFunctions(@transient sqlContext: SQLContext) extends Serializable with Logging {
-
- /**
- * Expose a Geode OQL query result as a DataFrame
- * @param query the OQL query string.
- */
- def geodeOQL(
- query: String,
- connConf: GeodeConnectionConf = GeodeConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = {
- logInfo(s"OQL query = $query")
- val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf)
- sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext))
- }
-
- private[connector] def defaultConnectionConf: GeodeConnectionConf =
- GeodeConnectionConf(sqlContext.sparkContext.getConf)
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
deleted file mode 100644
index 617cb33..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD
-import org.apache.spark.SparkContext
-
-import scala.reflect.ClassTag
-
-/** Provides Geode specific methods on `SparkContext` */
-class GeodeSparkContextFunctions(@transient sc: SparkContext) extends Serializable {
-
- /**
- * Expose a Geode region as a GeodeRDD
- * @param regionPath the full path of the region
- * @param connConf the GeodeConnectionConf that can be used to access the region
- * @param opConf use this to specify preferred partitioner
- * and its parameters. The implementation will use it if it's applicable
- */
- def geodeRegion[K: ClassTag, V: ClassTag] (
- regionPath: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf),
- opConf: Map[String, String] = Map.empty): GeodeRegionRDD[K, V] =
- GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf)
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
deleted file mode 100644
index b232712..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal
-
-import java.net.InetAddress
-
-import org.apache.geode.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut}
-import org.apache.geode.cache.execute.{FunctionException, FunctionService}
-import org.apache.geode.cache.query.Query
-import org.apache.geode.cache.{Region, RegionService}
-import org.apache.geode.internal.cache.execute.InternalExecution
-import io.pivotal.geode.spark.connector.internal.oql.QueryResultCollector
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition
-import org.apache.spark.{SparkEnv, Logging}
-import io.pivotal.geode.spark.connector.GeodeConnection
-import io.pivotal.geode.spark.connector.internal.geodefunctions._
-import java.util.{Set => JSet, List => JList }
-
-/**
- * Default GeodeConnection implementation. The instance of this should be
- * created by DefaultGeodeConnectionFactory
- * @param locators pairs of host/port of locators
- * @param gemFireProps The initial geode properties to be used.
- */
-private[connector] class DefaultGeodeConnection (
- locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty)
- extends GeodeConnection with Logging {
-
- private val clientCache = initClientCache()
-
- /** Register Geode functions to the Geode cluster */
- FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance())
- FunctionService.registerFunction(RetrieveRegionFunction.getInstance())
-
- private def initClientCache() : ClientCache = {
- try {
- val ccf = getClientCacheFactory
- ccf.create()
- } catch {
- case e: Exception =>
- logError(s"""Failed to init ClientCache, locators=${locators.mkString(",")}, Error: $e""")
- throw new RuntimeException(e)
- }
- }
-
- private def getClientCacheFactory: ClientCacheFactory = {
- import io.pivotal.geode.spark.connector.map2Properties
- val ccf = new ClientCacheFactory(gemFireProps)
- ccf.setPoolReadTimeout(30000)
- val servers = LocatorHelper.getAllGeodeServers(locators)
- if (servers.isDefined && servers.get.size > 0) {
- val sparkIp = System.getenv("SPARK_LOCAL_IP")
- val hostName = if (sparkIp != null) InetAddress.getByName(sparkIp).getCanonicalHostName
- else InetAddress.getLocalHost.getCanonicalHostName
- val executorId = SparkEnv.get.executorId
- val pickedServers = LocatorHelper.pickPreferredGeodeServers(servers.get, hostName, executorId)
- logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, host=$hostName executor=$executorId props=$gemFireProps""")
- logDebug(s"""Init ClientCache: all-severs=${pickedServers.mkString(",")}""")
- pickedServers.foreach{ case (host, port) => ccf.addPoolServer(host, port) }
- } else {
- logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""")
- locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) }
- }
- ccf
- }
-
- /** close the clientCache */
- override def close(): Unit =
- if (! clientCache.isClosed) clientCache.close()
-
- /** ----------------------------------------- */
- /** implementation of GeodeConnection trait */
- /** ----------------------------------------- */
-
- override def getQuery(queryString: String): Query =
- clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString)
-
- override def validateRegion[K, V](regionPath: String): Unit = {
- val md = getRegionMetadata[K, V](regionPath)
- if (! md.isDefined) throw new RuntimeException(s"The region named $regionPath was not found")
- }
-
- def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = {
- import scala.collection.JavaConversions.setAsJavaSet
- val region = getRegionProxy[K, V](regionPath)
- val set0: JSet[Integer] = Set[Integer](0)
- val exec = FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0)
- exec.setWaitOnExceptionFlag(true)
- try {
- val collector = exec.execute(RetrieveRegionMetadataFunction.ID)
- val r = collector.getResult.asInstanceOf[JList[RegionMetadata]]
- logDebug(r.get(0).toString)
- Some(r.get(0))
- } catch {
- case e: FunctionException =>
- if (e.getMessage.contains(s"The region named /$regionPath was not found")) None
- else throw e
- }
- }
-
- def getRegionProxy[K, V](regionPath: String): Region[K, V] = {
- val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
- if (region1 != null) region1
- else DefaultGeodeConnection.regionLock.synchronized {
- val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
- if (region2 != null) region2
- else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath)
- }
- }
-
- override def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)] = {
- val region = getRegionProxy[K, V](regionPath)
- val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", ${split.index})"""
- val args : Array[String] = Array[String](whereClause.getOrElse(""), desc)
- val collector = new StructStreamingResultCollector(desc)
- // RetrieveRegionResultCollector[(K, V)]
- import scala.collection.JavaConversions.setAsJavaSet
- val exec = FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution]
- .withBucketFilter(split.bucketSet.map(Integer.valueOf))
- exec.setWaitOnExceptionFlag(true)
- exec.execute(RetrieveRegionFunction.ID)
- collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], objs(1).asInstanceOf[V])}
- }
-
- override def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String) = {
- import scala.collection.JavaConversions.setAsJavaSet
- FunctionService.registerFunction(QueryFunction.getInstance())
- val collector = new QueryResultCollector
- val region = getRegionProxy(regionPath)
- val args: Array[String] = Array[String](queryString, bucketSet.toString)
- val exec = FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution]
- .withBucketFilter(bucketSet.map(Integer.valueOf))
- .withArgs(args)
- exec.execute(QueryFunction.ID)
- collector.getResult
- }
-}
-
-private[connector] object DefaultGeodeConnection {
- /** a lock object only used by getRegionProxy...() */
- private val regionLock = new Object
-}
-
-/** The purpose of this class is making unit test DefaultGeodeConnectionManager easier */
-class DefaultGeodeConnectionFactory {
-
- def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) =
- new DefaultGeodeConnection(locators, gemFireProps)
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
deleted file mode 100644
index eb67cda..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal
-
-import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf, GeodeConnectionManager}
-
-import scala.collection.mutable
-
-/**
- * Default implementation of GeodeConnectionFactory
- */
-class DefaultGeodeConnectionManager extends GeodeConnectionManager {
-
- def getConnection(connConf: GeodeConnectionConf): GeodeConnection =
- DefaultGeodeConnectionManager.getConnection(connConf)
-
- def closeConnection(connConf: GeodeConnectionConf): Unit =
- DefaultGeodeConnectionManager.closeConnection(connConf)
-
-}
-
-object DefaultGeodeConnectionManager {
-
- /** connection cache, keyed by host:port pair */
- private[connector] val connections = mutable.Map[(String, Int), GeodeConnection]()
-
- /**
- * use locator host:port pair to lookup cached connection. create new connection
- * and add it to the cache `connections` if it does not exist.
- */
- def getConnection(connConf: GeodeConnectionConf)
- (implicit factory: DefaultGeodeConnectionFactory = new DefaultGeodeConnectionFactory): GeodeConnection = {
-
- def getCachedConnection(locators: Seq[(String, Int)]): GeodeConnection = {
- val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null)
- if (conns.nonEmpty) conns(0) else null
- }
-
- val conn1 = getCachedConnection(connConf.locators)
- if (conn1 != null) conn1
- else connections.synchronized {
- val conn2 = getCachedConnection(connConf.locators)
- if (conn2 != null) conn2
- else {
- val conn3 = factory.newConnection(connConf.locators, connConf.geodeProps)
- connConf.locators.foreach(pair => connections += (pair -> conn3))
- conn3
- }
- }
- }
-
- /**
- * Close the connection and remove it from connection cache.
- * Note: multiple entries may share the same connection, all those entries are removed.
- */
- def closeConnection(connConf: GeodeConnectionConf): Unit = {
- val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null)
- if (conns.nonEmpty) connections.synchronized {
- conns(0).close()
- connections.retain((k,v) => v != conns(0))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
deleted file mode 100644
index 4baa936..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal
-
-import java.net.InetSocketAddress
-import java.util.{ArrayList => JArrayList}
-
-import org.apache.geode.cache.client.internal.locator.{GetAllServersResponse, GetAllServersRequest}
-import org.apache.geode.distributed.internal.ServerLocation
-import org.apache.geode.distributed.internal.tcpserver.TcpClient
-import org.apache.spark.Logging
-
-import scala.util.{Failure, Success, Try}
-
-
-object LocatorHelper extends Logging {
-
- /** valid locator strings are: host[port] and host:port */
- final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r
- final val LocatorPattern2 = """([\w-_]+(\.[\w-_]+)*):([0-9]{2,5})""".r
-
- /** convert single locator string to Try[(host, port)] */
- def locatorStr2HostPortPair(locatorStr: String): Try[(String, Int)] =
- locatorStr match {
- case LocatorPattern1(host, domain, port) => Success((host, port.toInt))
- case LocatorPattern2(host, domain, port) => Success((host, port.toInt))
- case _ => Failure(new Exception(s"invalid locator: $locatorStr"))
- }
-
- /**
- * Parse locator strings and returns Seq of (hostname, port) pair.
- * Valid locator string are one or more "host[port]" and/or "host:port"
- * separated by `,`. For example:
- * host1.mydomain.com[8888],host2.mydomain.com[8889]
- * host1.mydomain.com:8888,host2.mydomain.com:8889
- */
- def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] =
- locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get)
-
-
- /**
- * Return the list of live Geode servers for the given locators.
- * @param locators locators for the given Geode cluster
- * @param serverGroup optional server group name, default is "" (empty string)
- */
- def getAllGeodeServers(locators: Seq[(String, Int)], serverGroup: String = ""): Option[Seq[(String, Int)]] = {
- var result: Option[Seq[(String, Int)]] = None
- locators.find { case (host, port) =>
- try {
- val addr = new InetSocketAddress(host, port)
- val req = new GetAllServersRequest(serverGroup)
- val res = TcpClient.requestToServer(addr.getAddress, addr.getPort, req, 2000)
- if (res != null) {
- import scala.collection.JavaConverters._
- val servers = res.asInstanceOf[GetAllServersResponse].getServers.asInstanceOf[JArrayList[ServerLocation]]
- if (servers.size > 0)
- result = Some(servers.asScala.map(e => (e.getHostName, e.getPort)))
- }
- } catch { case e: Exception => logWarning("getAllGeodeServers error", e)
- }
- result.isDefined
- }
- result
- }
-
- /**
- * Pick up at most 3 preferred servers from all available servers based on
- * host name and Spark executor id.
- *
- * This method is used by DefaultGeodeConnection to create ClientCache. Usually
- * one server is enough to initialize ClientCacheFactory, but this provides two
- * backup servers in case of the 1st server can't be connected.
- *
- * @param servers all available servers in the form of (hostname, port) pairs
- * @param hostName the host name of the Spark executor
- * @param executorId the Spark executor Id, such as "<driver>", "0", "1", ...
- * @return Seq[(hostname, port)] of preferred servers
- */
- def pickPreferredGeodeServers(
- servers: Seq[(String, Int)], hostName: String, executorId: String): Seq[(String, Int)] = {
-
- // pick up `length` items form the Seq starts at the `start` position.
- // The Seq is treated as a ring, so at most `Seq.size` items can be picked
- def circularTake[T](seq: Seq[T], start: Int, length: Int): Seq[T] = {
- val size = math.min(seq.size, length)
- (start until start + size).map(x => seq(x % seq.size))
- }
-
- // map executor id to int: "<driver>" (or non-number string) to 0, and "n" to n + 1
- val id = try { executorId.toInt + 1 } catch { case e: NumberFormatException => 0 }
-
- // algorithm:
- // 1. sort server list
- // 2. split sorted server list into 3 sub-lists a, b, and c:
- // list-a: servers on the given host
- // list-b: servers that are in front of list-a on the sorted server list
- // list-c: servers that are behind list-a on the sorted server list
- // then rotate list-a based on executor id, then create new server list:
- // modified list-a ++ list-c ++ list-b
- // 3. if there's no server on the given host, then create new server list
- // by rotating sorted server list based on executor id.
- // 4. take up to 3 servers from the new server list
- val sortedServers = servers.sorted
- val firstIdx = sortedServers.indexWhere(p => p._1 == hostName)
- val lastIdx = if (firstIdx < 0) -1 else sortedServers.lastIndexWhere(p => p._1 == hostName)
-
- if (firstIdx < 0) { // no local server
- circularTake(sortedServers, id, 3)
- } else {
- val (seq1, seq2) = sortedServers.splitAt(firstIdx)
- val seq = if (firstIdx == lastIdx) { // one local server
- seq2 ++ seq1
- } else { // multiple local server
- val (seq3, seq4) = seq2.splitAt(lastIdx - firstIdx + 1)
- val seq3b = if (id % seq3.size == 0) seq3 else circularTake(seq3, id, seq3.size)
- seq3b ++ seq4 ++ seq1
- }
- circularTake(seq, 0, 3)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala
deleted file mode 100644
index 5139be4..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions
-
-import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
-import org.apache.geode.DataSerializer
-import org.apache.geode.cache.execute.ResultCollector
-import org.apache.geode.cache.query.internal.types.StructTypeImpl
-import org.apache.geode.cache.query.types.StructType
-import org.apache.geode.distributed.DistributedMember
-import org.apache.geode.internal.{Version, ByteArrayDataInput}
-import io.pivotal.geode.spark.connector.internal.geodefunctions.StructStreamingResultSender.
- {TYPE_CHUNK, DATA_CHUNK, ERROR_CHUNK, SER_DATA, UNSER_DATA, BYTEARR_DATA}
-
-/**
- * StructStreamingResultCollector and StructStreamingResultSender are paired
- * to transfer result of list of `org.apache.geode.cache.query.Struct`
- * from Geode server to Spark Connector (the client of Geode server)
- * in streaming, i.e., while sender sending the result, the collector can
- * start processing the arrived result without waiting for full result to
- * become available.
- */
-class StructStreamingResultCollector(desc: String) extends ResultCollector[Array[Byte], Iterator[Array[Object]]] {
-
- /** the constructor that provide default `desc` (description) */
- def this() = this("StructStreamingResultCollector")
-
- private val queue: BlockingQueue[Array[Byte]] = new LinkedBlockingQueue[Array[Byte]]()
- var structType: StructType = null
-
- /** ------------------------------------------ */
- /** ResultCollector interface implementations */
- /** ------------------------------------------ */
-
- override def getResult: Iterator[Array[Object]] = resultIterator
-
- override def getResult(timeout: Long, unit: TimeUnit): Iterator[Array[Object]] =
- throw new UnsupportedOperationException()
-
- /** addResult add non-empty byte array (chunk) to the queue */
- override def addResult(memberID: DistributedMember, chunk: Array[Byte]): Unit =
- if (chunk != null && chunk.size > 1) {
- this.queue.add(chunk)
- // println(s"""$desc receive from $memberID: ${chunk.mkString(" ")}""")
- }
-
- /** endResults add special `Array.empty` to the queue as marker of end of data */
- override def endResults(): Unit = this.queue.add(Array.empty)
-
- override def clearResults(): Unit = this.queue.clear()
-
- /** ------------------------------------------ */
- /** Internal methods */
- /** ------------------------------------------ */
-
- def getResultType: StructType = {
- // trigger lazy resultIterator initialization if necessary
- if (structType == null) resultIterator.hasNext
- structType
- }
-
- /**
- * Note: The data is sent in chunks, and each chunk contains multiple
- * records. So the result iterator is an iterator (I) of iterator (II),
- * i.e., go through each chunk (iterator (I)), and for each chunk, go
- * through each record (iterator (II)).
- */
- private lazy val resultIterator = new Iterator[Array[Object]] {
-
- private var currentIterator: Iterator[Array[Object]] = nextIterator()
-
- override def hasNext: Boolean = {
- if (!currentIterator.hasNext && currentIterator != Iterator.empty) currentIterator = nextIterator()
- currentIterator.hasNext
- }
-
- /** Note: make sure call `hasNext` first to adjust `currentIterator` */
- override def next(): Array[Object] = currentIterator.next()
- }
-
- /** get the iterator for the next chunk of data */
- private def nextIterator(): Iterator[Array[Object]] = {
- val chunk: Array[Byte] = queue.take
- if (chunk.isEmpty) {
- Iterator.empty
- } else {
- val input = new ByteArrayDataInput()
- input.initialize(chunk, Version.CURRENT)
- val chunkType = input.readByte()
- // println(s"chunk type $chunkType")
- chunkType match {
- case TYPE_CHUNK =>
- if (structType == null)
- structType = DataSerializer.readObject(input).asInstanceOf[StructTypeImpl]
- nextIterator()
- case DATA_CHUNK =>
- // require(structType != null && structType.getFieldNames.length > 0)
- if (structType == null) structType = StructStreamingResultSender.KeyValueType
- chunkToIterator(input, structType.getFieldNames.length)
- case ERROR_CHUNK =>
- val error = DataSerializer.readObject(input).asInstanceOf[Exception]
- errorPropagationIterator(error)
- case _ => throw new RuntimeException(s"unknown chunk type: $chunkType")
- }
- }
- }
-
- /** create a iterator that propagate sender's exception */
- private def errorPropagationIterator(ex: Exception) = new Iterator[Array[Object]] {
- val re = new RuntimeException(ex)
- override def hasNext: Boolean = throw re
- override def next(): Array[Object] = throw re
- }
-
- /** convert a chunk of data to an iterator */
- private def chunkToIterator(input: ByteArrayDataInput, rowSize: Int) = new Iterator[Array[Object]] {
- override def hasNext: Boolean = input.available() > 0
- val tmpInput = new ByteArrayDataInput()
- override def next(): Array[Object] =
- (0 until rowSize).map { ignore =>
- val b = input.readByte()
- b match {
- case SER_DATA =>
- val arr: Array[Byte] = DataSerializer.readByteArray(input)
- tmpInput.initialize(arr, Version.CURRENT)
- DataSerializer.readObject(tmpInput).asInstanceOf[Object]
- case UNSER_DATA =>
- DataSerializer.readObject(input).asInstanceOf[Object]
- case BYTEARR_DATA =>
- DataSerializer.readByteArray(input).asInstanceOf[Object]
- case _ =>
- throw new RuntimeException(s"unknown data type $b")
- }
- }.toArray
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala
deleted file mode 100644
index 3f6dfad..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import scala.util.parsing.combinator.RegexParsers
-
-class QueryParser extends RegexParsers {
-
- def query: Parser[String] = opt(rep(IMPORT ~ PACKAGE)) ~> select ~> opt(distinct) ~> projection ~> from ~> regions <~ opt(where ~ filter) ^^ {
- _.toString
- }
-
- val IMPORT: Parser[String] = "[Ii][Mm][Pp][Oo][Rr][Tt]".r
-
- val select: Parser[String] = "[Ss][Ee][Ll][Ee][Cc][Tt]".r
-
- val distinct: Parser[String] = "[Dd][Ii][Ss][Tt][Ii][Nn][Cc][Tt]".r
-
- val from: Parser[String] = "[Ff][Rr][Oo][Mm]".r
-
- val where: Parser[String] = "[Ww][Hh][Ee][Rr][Ee]".r
-
- def PACKAGE: Parser[String] = """[\w.]+""".r
-
- def projection: Parser[String] = "*" | repsep("""["\w]+[.\w"]*""".r, ",") ^^ {
- _.toString
- }
-
- def regions: Parser[String] = repsep(region <~ opt(alias), ",") ^^ {
- _.toString
- }
-
- def region: Parser[String] = """/[\w.]+[/[\w.]+]*""".r | """[\w]+[.\w]*""".r
-
- def alias: Parser[String] = not(where) ~> """[\w]+""".r
-
- def filter: Parser[String] = """[\w.]+[[\s]+[<>=.'\w]+]*""".r
-}
-
-object QueryParser extends QueryParser {
-
- def parseOQL(expression: String) = parseAll(query, expression)
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala
deleted file mode 100644
index 474aa6a..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf
-import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDPartition, ServerSplitsPartitioner}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{TaskContext, SparkContext, Partition}
-import scala.reflect.ClassTag
-
-/**
- * An RDD that provides the functionality that read the OQL query result
- *
- * @param sc The SparkContext this RDD is associated with
- * @param queryString The OQL query string
- * @param connConf The GeodeConnectionConf that provide the GeodeConnection
- */
-class QueryRDD[T](@transient sc: SparkContext,
- queryString: String,
- connConf: GeodeConnectionConf)
- (implicit ct: ClassTag[T])
- extends RDD[T](sc, Seq.empty) {
-
- override def getPartitions: Array[Partition] = {
- val conn = connConf.getConnection
- val regionPath = getRegionPathFromQuery(queryString)
- val md = conn.getRegionMetadata(regionPath)
- md match {
- case Some(metadata) =>
- if (metadata.isPartitioned) {
- val splits = ServerSplitsPartitioner.partitions(conn, metadata, Map.empty)
- logInfo(s"QueryRDD.getPartitions():isPartitioned=true, partitions=${splits.mkString(",")}")
- splits
- }
- else {
- logInfo(s"QueryRDD.getPartitions():isPartitioned=false")
- Array[Partition](new GeodeRDDPartition(0, Set.empty))
-
- }
- case None => throw new RuntimeException(s"Region $regionPath metadata was not found.")
- }
- }
-
- override def compute(split: Partition, context: TaskContext): Iterator[T] = {
- val buckets = split.asInstanceOf[GeodeRDDPartition].bucketSet
- val regionPath = getRegionPathFromQuery(queryString)
- val result = connConf.getConnection.executeQuery(regionPath, buckets, queryString)
- result match {
- case it: Iterator[T] =>
- logInfo(s"QueryRDD.compute():query=$queryString, partition=$split")
- it
- case _ =>
- throw new RuntimeException("Unexpected OQL result: " + result.toString)
- }
- }
-
- private def getRegionPathFromQuery(queryString: String): String = {
- val r = QueryParser.parseOQL(queryString).get
- r match {
- case r: String =>
- val start = r.indexOf("/") + 1
- var end = r.indexOf(")")
- if (r.indexOf(".") > 0) end = math.min(r.indexOf("."), end)
- if (r.indexOf(",") > 0) end = math.min(r.indexOf(","), end)
- val regionPath = r.substring(start, end)
- regionPath
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala
deleted file mode 100644
index 718d816..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
-
-import org.apache.geode.DataSerializer
-import org.apache.geode.cache.execute.ResultCollector
-import org.apache.geode.distributed.DistributedMember
-import org.apache.geode.internal.{Version, ByteArrayDataInput}
-
-class QueryResultCollector extends ResultCollector[Array[Byte], Iterator[Object]]{
-
- private val queue = new LinkedBlockingDeque[Array[Byte]]()
-
- override def getResult = resultIterator
-
- override def getResult(timeout: Long, unit: TimeUnit) = throw new UnsupportedOperationException
-
- override def addResult(memberID: DistributedMember , chunk: Array[Byte]) =
- if (chunk != null && chunk.size > 0) {
- queue.add(chunk)
- }
-
- override def endResults = queue.add(Array.empty)
-
-
- override def clearResults = queue.clear
-
- private lazy val resultIterator = new Iterator[Object] {
- private var currentIterator = nextIterator
- def hasNext = {
- if (!currentIterator.hasNext && currentIterator != Iterator.empty)
- currentIterator = nextIterator
- currentIterator.hasNext
- }
- def next = currentIterator.next
- }
-
- private def nextIterator: Iterator[Object] = {
- val chunk = queue.take
- if (chunk.isEmpty) {
- Iterator.empty
- }
- else {
- val input = new ByteArrayDataInput
- input.initialize(chunk, Version.CURRENT)
- new Iterator[Object] {
- override def hasNext: Boolean = input.available() > 0
- override def next: Object = DataSerializer.readObject(input).asInstanceOf[Object]
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala
deleted file mode 100644
index 6a1611c..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.sources.{BaseRelation, TableScan}
-
-import scala.tools.nsc.backend.icode.analysis.DataFlowAnalysis
-
-case class OQLRelation[T](queryRDD: QueryRDD[T])(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {
-
- override def schema: StructType = new SchemaBuilder(queryRDD).toSparkSchema()
-
- override def buildScan(): RDD[Row] = new RowBuilder(queryRDD).toRowRDD()
-
-}
-
-object RDDConverter {
-
- def queryRDDToDataFrame[T](queryRDD: QueryRDD[T], sqlContext: SQLContext): DataFrame = {
- sqlContext.baseRelationToDataFrame(OQLRelation(queryRDD)(sqlContext))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala
deleted file mode 100644
index acbabc1..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import org.apache.geode.cache.query.internal.StructImpl
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-
-class RowBuilder[T](queryRDD: QueryRDD[T]) {
-
- /**
- * Convert QueryRDD to RDD of Row
- * @return RDD of Rows
- */
- def toRowRDD(): RDD[Row] = {
- val rowRDD = queryRDD.map(row => {
- row match {
- case si: StructImpl => Row.fromSeq(si.getFieldValues)
- case obj: Object => Row.fromSeq(Seq(obj))
- }
- })
- rowRDD
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala
deleted file mode 100644
index 44972839..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import org.apache.geode.cache.query.internal.StructImpl
-import org.apache.spark.sql.types._
-import scala.collection.mutable.ListBuffer
-import org.apache.spark.Logging
-
-class SchemaBuilder[T](queryRDD: QueryRDD[T]) extends Logging {
-
- val nullStructType = StructType(Nil)
-
- val typeMap:Map[Class[_], DataType] = Map(
- (classOf[java.lang.String], StringType),
- (classOf[java.lang.Integer], IntegerType),
- (classOf[java.lang.Short], ShortType),
- (classOf[java.lang.Long], LongType),
- (classOf[java.lang.Double], DoubleType),
- (classOf[java.lang.Float], FloatType),
- (classOf[java.lang.Boolean], BooleanType),
- (classOf[java.lang.Byte], ByteType),
- (classOf[java.util.Date], DateType),
- (classOf[java.lang.Object], nullStructType)
- )
-
- /**
- * Analyse QueryRDD to get the Spark schema
- * @return The schema represented by Spark StructType
- */
- def toSparkSchema(): StructType = {
- val row = queryRDD.first()
- val tpe = row match {
- case r: StructImpl => constructFromStruct(r)
- case null => StructType(StructField("col1", NullType) :: Nil)
- case default =>
- val value = typeMap.getOrElse(default.getClass(), nullStructType)
- StructType(StructField("col1", value) :: Nil)
- }
- logInfo(s"Schema: $tpe")
- tpe
- }
-
- def constructFromStruct(r:StructImpl) = {
- val names = r.getFieldNames
- val values = r.getFieldValues
- val lb = new ListBuffer[StructField]()
- for (i <- 0 until names.length) {
- val name = names(i)
- val value = values(i)
- val dataType = value match {
- case null => NullType
- case default => typeMap.getOrElse(default.getClass, nullStructType)
- }
- lb += StructField(name, dataType)
- }
- StructType(lb.toSeq)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala
deleted file mode 100644
index 2809a73..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import com.esotericsoftware.kryo.io.{Output, Input}
-import org.apache.geode.cache.query.QueryService
-import org.apache.geode.cache.query.internal.Undefined
-
-/**
- * This is the customized serializer to serialize QueryService.UNDEFINED,
- * i.e. org.apache.geode.cache.query.internal.Undefined, in order to
- * guarantee the singleton Undefined after its deserialization within Spark.
- */
-class UndefinedSerializer extends Serializer[Undefined] {
-
- def write(kryo: Kryo, output: Output, u: Undefined) {
- //Only serialize a byte for Undefined
- output.writeByte(u.getDSFID)
- }
-
- def read (kryo: Kryo, input: Input, tpe: Class[Undefined]): Undefined = {
- //Read DSFID of Undefined
- input.readByte()
- QueryService.UNDEFINED match {
- case null => new Undefined
- case _ =>
- //Avoid calling Undefined constructor again.
- QueryService.UNDEFINED.asInstanceOf[Undefined]
- }
- }
-}