You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/21 19:17:00 UTC

[25/50] [abbrv] incubator-geode git commit: GEODE-1244: Revert rename of package, directory, project and file rename for geode-spark-connector

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
deleted file mode 100644
index 24fe72e..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import org.apache.spark.Partition
-
-/**
- * This serializable class represents a GeodeRDD partition. Each partition is mapped
- * to one or more buckets of region. The GeodeRDD can materialize the data of the 
- * partition based on all information contained here.
- * @param partitionId partition id, a 0 based number.
- * @param bucketSet region bucket id set for this partition. Set.empty means whole
- *                  region (used for replicated region)
- * @param locations preferred location for this partition                  
- */
-case class GeodeRDDPartition (
-  partitionId: Int, bucketSet: Set[Int], locations: Seq[String] = Nil)
-  extends Partition  {
-  
-  override def index: Int = partitionId
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
deleted file mode 100644
index d960cab..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import io.pivotal.geode.spark.connector.GeodeConnection
-import io.pivotal.geode.spark.connector.internal.RegionMetadata
-import org.apache.spark.{Logging, Partition}
-
-import scala.reflect.ClassTag
-
-/**
- * A GeodeRDD partitioner is used to partition the region into multiple RDD partitions.
- */
-trait GeodeRDDPartitioner extends Serializable {
-
-  def name: String
-  
-  /** the function that generates partitions */
-  def partitions[K: ClassTag, V: ClassTag]
-    (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition]
-}
-
-object GeodeRDDPartitioner extends Logging {
-
-  /** To add new partitioner, just add it to the following list */
-  final val partitioners: Map[String, GeodeRDDPartitioner] =
-    List(OnePartitionPartitioner, ServerSplitsPartitioner).map(e => (e.name, e)).toMap
-
-  /**
-   * Get a partitioner based on given name, a default partitioner will be returned if there's
-   * no partitioner for the given name. 
-   */
-  def apply(name: String = defaultPartitionedRegionPartitioner.name): GeodeRDDPartitioner = {
-    val p = partitioners.get(name)
-    if (p.isDefined) p.get else {
-      logWarning(s"Invalid preferred partitioner name $name.")
-      defaultPartitionedRegionPartitioner
-    }
-  }
-
-  val defaultReplicatedRegionPartitioner = OnePartitionPartitioner
-
-  val defaultPartitionedRegionPartitioner = ServerSplitsPartitioner
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
deleted file mode 100644
index 4606114..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import io.pivotal.geode.spark.connector.GeodeConnection
-import io.pivotal.geode.spark.connector.internal.RegionMetadata
-import io.pivotal.geode.spark.connector.NumberPartitionsPerServerPropKey
-import org.apache.spark.Partition
-import scala.collection.JavaConversions._
-import scala.collection.immutable.SortedSet
-import scala.collection.mutable
-import scala.reflect.ClassTag
-
-/** This partitioner maps whole region to one GeodeRDDPartition */
-object OnePartitionPartitioner extends GeodeRDDPartitioner {
-
-  override val name = "OnePartition"
-
-  override def partitions[K: ClassTag, V: ClassTag]
-    (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] =
-    Array[Partition](new GeodeRDDPartition(0, Set.empty))
-}
-
-/**
-  * This partitioner maps whole region to N * M Geode RDD partitions, where M is the number of 
-  * Geode servers that contain the data for the given region. Th default value of N is 1.
-  */
-object ServerSplitsPartitioner extends GeodeRDDPartitioner {
-
-  override val name = "ServerSplits"
-
-  override def partitions[K: ClassTag, V: ClassTag]
-  (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = {
-    if (md == null) throw new RuntimeException("RegionMetadata is null")
-    val n = try { env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt } catch { case e: NumberFormatException => 2 }
-    if (!md.isPartitioned || md.getServerBucketMap == null || md.getServerBucketMap.isEmpty)
-      Array[Partition](new GeodeRDDPartition(0, Set.empty))
-    else {
-      val map = mapAsScalaMap(md.getServerBucketMap)
-        .map { case (srv, set) => (srv, asScalaSet(set).map(_.toInt)) }.toList
-        .map { case (srv, set) => (srv.getHostName, set) }
-       doPartitions(map, md.getTotalBuckets, n)
-    }
-  }
-
-  /** Converts server to bucket ID set list to array of RDD partitions */
-  def doPartitions(serverBucketMap: List[(String, mutable.Set[Int])], totalBuckets: Int, n: Int)
-    : Array[Partition] = {
-
-    // method that calculates the group size for splitting "k" items into "g" groups
-    def groupSize(k: Int, g: Int): Int = scala.math.ceil(k / g.toDouble).toInt
-
-    // 1. convert list of server and bucket set pairs to a list of server and sorted bucket set pairs
-    val srvToSortedBucketSet = serverBucketMap.map { case (srv, set) => (srv, SortedSet[Int]() ++ set) }
-
-    // 2. split bucket set of each server into n splits if possible, and server to Seq(server)
-    val srvToSplitedBuckeSet = srvToSortedBucketSet.flatMap { case (host, set) =>
-      if (set.isEmpty) Nil else set.grouped(groupSize(set.size, n)).toList.map(s => (Seq(host), s)) }
-
-    // 3. calculate empty bucket IDs by removing all bucket sets of all servers from the full bucket sets
-    val emptyIDs = SortedSet[Int]() ++ ((0 until totalBuckets).toSet /: srvToSortedBucketSet) {case (s1, (k, s2)) => s1 &~ s2}
-
-    // 4. distribute empty bucket IDs to all partitions evenly.
-    //    The empty buckets do not contain data when partitions are created, but they may contain data
-    //    when RDD is materialized, so need to include those bucket IDs in the partitions.
-    val srvToFinalBucketSet = if (emptyIDs.isEmpty) srvToSplitedBuckeSet
-      else srvToSplitedBuckeSet.zipAll(
-        emptyIDs.grouped(groupSize(emptyIDs.size, srvToSplitedBuckeSet.size)).toList, (Nil, Set.empty), Set.empty).map
-          { case ((server, set1), set2) => (server, SortedSet[Int]() ++ set1 ++ set2) }
-
-    // 5. create array of partitions w/ 0-based index
-    (0 until srvToFinalBucketSet.size).toList.zip(srvToFinalBucketSet).map
-      { case (i, (srv, set)) => new GeodeRDDPartition(i, set, srv) }.toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
deleted file mode 100644
index dba15f3..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.geode.spark.connector._
-import org.apache.spark.{Logging, TaskContext}
-
-import scala.collection.Iterator
-import java.util.{HashMap => JMap}
-
-/** This trait provide some common code for pair and non-pair RDD writer */
-private[rdd] abstract class GeodeRDDWriterBase (opConf: Map[String, String]) extends Serializable {
-
-  val batchSize = try { opConf.getOrElse(RDDSaveBatchSizePropKey, RDDSaveBatchSizeDefault.toString).toInt}
-                  catch { case e: NumberFormatException => RDDSaveBatchSizeDefault }
-
-  def mapDump(map: Map[_, _], num: Int): String = {
-    val firstNum = map.take(num + 1)
-    if (firstNum.size > num) s"$firstNum ..." else s"$firstNum"    
-  }  
-}
-
-/**
- * Writer object that provides write function that saves non-pair RDD partitions to Geode.
- * Those functions will be executed on Spark executors.
- * @param regionPath the full path of the region where the data is written to
- */
-class GeodeRDDWriter[T, K, V] 
-  (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty)
-  extends GeodeRDDWriterBase(opConf) with Serializable with Logging {
-
-  def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): Unit = {
-    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath)
-    var count = 0
-    val chunks = data.grouped(batchSize)
-    chunks.foreach { chunk =>
-      val map = chunk.foldLeft(new JMap[K, V]()){case (m, t) => val (k, v) = func(t); m.put(k, v); m}
-      region.putAll(map)
-      count += chunk.length
-    }
-    logDebug(s"$count entries (batch.size = $batchSize) are saved to region $regionPath")
-  }
-}
-
-
-/**
- * Writer object that provides write function that saves pair RDD partitions to Geode.
- * Those functions will be executed on Spark executors.
- * @param regionPath the full path of the region where the data is written to
- */
-class GeodePairRDDWriter[K, V]
-  (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty)
-  extends GeodeRDDWriterBase(opConf) with Serializable with Logging {
-
-  def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = {
-    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath)
-    var count = 0
-    val chunks = data.grouped(batchSize)
-    chunks.foreach { chunk =>
-      val map = chunk.foldLeft(new JMap[K, V]()){case (m, (k,v)) => m.put(k,v); m}
-      region.putAll(map)
-      count += chunk.length
-    }
-    logDebug(s"$count entries (batch.batch = $batchSize) are saved to region $regionPath")
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
deleted file mode 100644
index 6980c0f..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import scala.collection.Seq
-import scala.reflect.ClassTag
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{TaskContext, Partition, SparkContext}
-import io.pivotal.geode.spark.connector.{GeodeConnectionConf, PreferredPartitionerPropKey}
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._
-
-/**
- * This class exposes Geode region as a RDD.
- * @param sc the Spark Context
- * @param regionPath the full path of the region
- * @param connConf the GeodeConnectionConf to access the region
- * @param opConf the parameters for this operation, such as preferred partitioner.
- */
-class GeodeRegionRDD[K, V] private[connector]
-  (@transient sc: SparkContext,
-   val regionPath: String,
-   val connConf: GeodeConnectionConf,
-   val opConf: Map[String, String] = Map.empty,
-   val whereClause: Option[String] = None 
-  ) (implicit ctk: ClassTag[K], ctv: ClassTag[V])
-  extends RDD[(K, V)](sc, Seq.empty) {
-
-  /** validate region existence when GeodeRDD object is created */
-  validate()
-
-  /** Validate region, and make sure it exists. */
-  private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath)
-
-  def kClassTag = ctk
-  
-  def vClassTag = ctv
-
-  /**
-   * method `copy` is used by method `where` that creates new immutable
-   * GeodeRDD instance based this instance.
-   */
-  private def copy(
-    regionPath: String = regionPath,
-    connConf: GeodeConnectionConf = connConf,
-    opConf: Map[String, String] = opConf,
-    whereClause: Option[String] = None
-  ): GeodeRegionRDD[K, V] = {
-
-    require(sc != null,
-    """RDD transformation requires a non-null SparkContext. Unfortunately
-      |SparkContext in this GeodeRDD is null. This can happen after 
-      |GeodeRDD has been deserialized. SparkContext is not Serializable,
-      |therefore it deserializes to null. RDD transformations are not allowed
-      |inside lambdas used in other RDD transformations.""".stripMargin )
-
-    new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf, whereClause)
-  }
-
-  /** When where clause is specified, OQL query
-    * `select key, value from /<region-path>.entries where <where clause> `
-    * is used to filter the dataset.
-    */
-  def where(whereClause: Option[String]): GeodeRegionRDD[K, V] = {
-    if (whereClause.isDefined) copy(whereClause = whereClause)
-    else this
-  }
-
-  /** this version is for Java API that doesn't use scala.Option */
-  def where(whereClause: String): GeodeRegionRDD[K, V] = {
-    if (whereClause == null || whereClause.trim.isEmpty) this
-    else copy(whereClause = Option(whereClause.trim))
-  }
-
-  /**
-   * Use preferred partitioner generate partitions. `defaultReplicatedRegionPartitioner`
-   * will be used if it's a replicated region. 
-   */
-  override def getPartitions: Array[Partition] = {
-    val conn = connConf.getConnection
-    val md = conn.getRegionMetadata[K, V](regionPath)
-    md match {
-      case None => throw new RuntimeException(s"region $regionPath was not found.")
-      case Some(data) =>
-        logInfo(s"""RDD id=${this.id} region=$regionPath conn=${connConf.locators.mkString(",")}, env=$opConf""")
-        val p = if (data.isPartitioned) preferredPartitioner else defaultReplicatedRegionPartitioner
-        val splits = p.partitions[K, V](conn, data, opConf)
-        logDebug(s"""RDD id=${this.id} region=$regionPath partitions=\n  ${splits.mkString("\n  ")}""")
-        splits
-    }
-  }
-
-  /**
-   * provide preferred location(s) (host name(s)) of the given partition. 
-   * Only some partitioner implementation(s) provides this info, which is
-   * useful when Spark cluster and Geode cluster share some hosts.
-   */
-  override def getPreferredLocations(split: Partition) =
-    split.asInstanceOf[GeodeRDDPartition].locations
-
-  /**
-   * Get preferred partitioner. return `defaultPartitionedRegionPartitioner` if none
-   * preference is specified. 
-   */
-  private def preferredPartitioner = 
-    GeodeRDDPartitioner(opConf.getOrElse(
-      PreferredPartitionerPropKey, GeodeRDDPartitioner.defaultPartitionedRegionPartitioner.name))
-
-  /** materialize a RDD partition */
-  override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
-    val partition = split.asInstanceOf[GeodeRDDPartition]
-    logDebug(s"compute RDD id=${this.id} partition $partition")
-    connConf.getConnection.getRegionData[K,V](regionPath, whereClause, partition)
-    // new InterruptibleIterator(context, split.asInstanceOf[GeodeRDDPartition[K, V]].iterator)
-  }
-}
-
-object GeodeRegionRDD {
-
-  def apply[K: ClassTag, V: ClassTag](sc: SparkContext, regionPath: String,
-    connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty)
-    : GeodeRegionRDD[K, V] =
-    new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
deleted file mode 100644
index f859173..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi
-
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD
-import org.apache.spark.api.java.JavaPairRDD
-
-class GeodeJavaRegionRDD[K, V](rdd: GeodeRegionRDD[K, V]) extends JavaPairRDD[K, V](rdd)(rdd.kClassTag, rdd.vClassTag) {
-  
-  def where(whereClause: String): GeodeJavaRegionRDD[K, V] = new GeodeJavaRegionRDD(rdd.where(whereClause))
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
deleted file mode 100644
index ffa6195..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi
-
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
-import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream}
-
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
-/**
- *  A helper class to make it possible to access components written in Scala from Java code.
- */
-private[connector] object JavaAPIHelper {
-
-  /** Returns a `ClassTag` of a given runtime class. */
-  def getClassTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz)
-
-  /**
-   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
-   * see JavaSparkContext.fakeClassTag in Spark for more info.
-   */
-  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
-
-  /** Converts a Java `Properties` to a Scala immutable `Map[String, String]`. */
-  def propertiesToScalaMap[K, V](props: java.util.Properties): Map[String, String] =
-    Map(props.toSeq: _*)
-
-  /** convert a JavaRDD[(K,V)] to JavaPairRDD[K,V] */
-  def toJavaPairRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] =
-    JavaPairRDD.fromJavaRDD(rdd)
-
-  /** convert a JavaDStream[(K,V)] to JavaPairDStream[K,V] */
-  def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] =
-    JavaPairDStream.fromJavaDStream(ds)
-
-  /** an empty Map[String, String] for default opConf **/
-  val emptyStrStrMap: Map[String, String] = Map.empty
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
deleted file mode 100644
index 6f9a780..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark
-
-import io.pivotal.geode.spark.connector.internal.rdd.{ServerSplitsPartitioner, OnePartitionPartitioner}
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-
-import scala.reflect.ClassTag
-
-/**
- * The root package of Geode connector for Apache Spark.
- * Provides handy implicit conversions that add geode-specific
- * methods to `SparkContext` and `RDD`.
- */
-package object connector {
-
-  /** constants */
-  final val GeodeLocatorPropKey = "spark.geode.locators"
-  // partitioner related keys and values
-  final val PreferredPartitionerPropKey = "preferred.partitioner"
-  final val NumberPartitionsPerServerPropKey = "number.partitions.per.server"
-  final val OnePartitionPartitionerName = OnePartitionPartitioner.name
-  final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name
-
-  final val RDDSaveBatchSizePropKey = "rdd.save.batch.size"
-  final val RDDSaveBatchSizeDefault = 10000
-  
-  /** implicits */
-  
-  implicit def toSparkContextFunctions(sc: SparkContext): GeodeSparkContextFunctions =
-    new GeodeSparkContextFunctions(sc)
-
-  implicit def toSQLContextFunctions(sqlContext: SQLContext): GeodeSQLContextFunctions =
-    new GeodeSQLContextFunctions(sqlContext)
-
-  implicit def toGeodePairRDDFunctions[K: ClassTag, V: ClassTag]
-    (self: RDD[(K, V)]): GeodePairRDDFunctions[K, V] = new GeodePairRDDFunctions(self)
-
-  implicit def toGeodeRDDFunctions[T: ClassTag]
-    (self: RDD[T]): GeodeRDDFunctions[T] = new GeodeRDDFunctions(self)
-
-  /** utility implicits */
-  
-  /** convert Map[String, String] to java.util.Properties */
-  implicit def map2Properties(map: Map[String,String]): java.util.Properties =
-    (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
-
-  /** internal util methods */
-  
-  private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n  "): String =
-    rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p loc=${rdd.preferredLocations(p)}"}.mkString(sep)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
deleted file mode 100644
index 4d46429..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.streaming
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf
-import io.pivotal.geode.spark.connector.internal.rdd.{GeodePairRDDWriter, GeodeRDDWriter}
-import org.apache.spark.Logging
-import org.apache.spark.api.java.function.PairFunction
-import org.apache.spark.streaming.dstream.DStream
-
-/**
- * Extra geode functions on DStream of non-pair elements through an implicit conversion.
- * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your program to
- * use these functions.
- */
-class GeodeDStreamFunctions[T](val dstream: DStream[T]) extends Serializable with Logging {
-
-  /**
-   * Save the DStream of non-pair elements to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored
-   * @param func the function that converts elements of the DStream to key/value pairs
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param opConf the optional parameters for this operation
-   */
-  def saveToGeode[K, V](
-      regionPath: String, 
-      func: T => (K, V), 
-      connConf: GeodeConnectionConf = defaultConnectionConf, 
-      opConf: Map[String, String] = Map.empty): Unit = {
-    connConf.getConnection.validateRegion[K, V](regionPath)
-    val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf)
-    logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""")
-    dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) _))
-  }
-
-  /** this version of saveToGeode is just for Java API */
-  def saveToGeode[K, V](
-      regionPath: String,
-      func: PairFunction[T, K, V],
-      connConf: GeodeConnectionConf,
-      opConf: Map[String, String] ): Unit = {
-    saveToGeode[K, V](regionPath, func.call _, connConf, opConf)
-  }
-
-  private[connector] def defaultConnectionConf: GeodeConnectionConf =
-    GeodeConnectionConf(dstream.context.sparkContext.getConf)
-}
-
-
-/**
- * Extra geode functions on DStream of (key, value) pairs through an implicit conversion.
- * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your program to
- * use these functions.
- */
-class GeodePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends Serializable with Logging {
-
-  /**
-   * Save the DStream of pairs to Geode key-value store without any conversion
-   * @param regionPath the full path of region that the DStream is stored
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param opConf the optional parameters for this operation
-   */
-  def saveToGeode(
-      regionPath: String, 
-      connConf: GeodeConnectionConf = defaultConnectionConf, 
-      opConf: Map[String, String] = Map.empty): Unit = {
-    connConf.getConnection.validateRegion[K, V](regionPath)
-    val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf)
-    logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""")
-    dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))
-  }
-
-  private[connector] def defaultConnectionConf: GeodeConnectionConf =
-    GeodeConnectionConf(dstream.context.sparkContext.getConf)
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
deleted file mode 100644
index 0d1f1eb..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import org.apache.spark.streaming.dstream.DStream
-
-/**
- * Provides handy implicit conversions that add gemfire-specific methods to `DStream`.
- */
-package object streaming {
-
-  implicit def toGeodeDStreamFunctions[T](ds: DStream[T]): GeodeDStreamFunctions[T] =
-    new GeodeDStreamFunctions[T](ds)
-
-  implicit def toGeodePairDStreamFunctions[K, V](ds: DStream[(K, V)]): GeodePairDStreamFunctions[K, V] =
-    new GeodePairDStreamFunctions[K, V](ds)
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java b/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java
deleted file mode 100644
index 142907e..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector;
-
-import io.pivotal.geode.spark.connector.javaapi.*;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.SQLContext;
-//import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.dstream.DStream;
-import org.junit.Test;
-import org.scalatest.junit.JUnitSuite;
-import scala.Function1;
-import scala.Function2;
-import scala.Tuple2;
-import scala.Tuple3;
-import scala.collection.mutable.LinkedList;
-import scala.reflect.ClassTag;
-
-import static org.junit.Assert.*;
-import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
-
-public class JavaAPITest extends JUnitSuite {
-
-  @SuppressWarnings( "unchecked" )
-  public Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> createCommonMocks() {
-    SparkContext mockSparkContext = mock(SparkContext.class);
-    GeodeConnectionConf mockConnConf = mock(GeodeConnectionConf.class);
-    GeodeConnection mockConnection = mock(GeodeConnection.class);
-    when(mockConnConf.getConnection()).thenReturn(mockConnection);
-    when(mockConnConf.locators()).thenReturn(new LinkedList());
-    return new Tuple3<>(mockSparkContext, mockConnConf, mockConnection);
-  }
-  
-  @Test
-  public void testSparkContextFunction() throws Exception {
-    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
-    GeodeJavaSparkContextFunctions wrapper = javaFunctions(tuple3._1());
-    assertTrue(tuple3._1() == wrapper.sc);
-    String regionPath = "testregion";
-    JavaPairRDD<String, String> rdd = wrapper.geodeRegion(regionPath, tuple3._2());
-    verify(tuple3._3()).validateRegion(regionPath);
-  }
-
-  @Test
-  public void testJavaSparkContextFunctions() throws Exception {
-    SparkContext mockSparkContext = mock(SparkContext.class);
-    JavaSparkContext mockJavaSparkContext = mock(JavaSparkContext.class);
-    when(mockJavaSparkContext.sc()).thenReturn(mockSparkContext);
-    GeodeJavaSparkContextFunctions wrapper = javaFunctions(mockJavaSparkContext);
-    assertTrue(mockSparkContext == wrapper.sc);
-  }
-  
-  @Test
-  @SuppressWarnings( "unchecked" )
-  public void testJavaPairRDDFunctions() throws Exception {
-    JavaPairRDD<String, Integer> mockPairRDD = mock(JavaPairRDD.class);
-    RDD<Tuple2<String, Integer>> mockTuple2RDD = mock(RDD.class);
-    when(mockPairRDD.rdd()).thenReturn(mockTuple2RDD);
-    GeodeJavaPairRDDFunctions wrapper = javaFunctions(mockPairRDD);
-    assertTrue(mockTuple2RDD == wrapper.rddf.rdd());
-
-    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
-    when(mockTuple2RDD.sparkContext()).thenReturn(tuple3._1());
-    String regionPath = "testregion";
-    wrapper.saveToGeode(regionPath, tuple3._2());
-    verify(mockTuple2RDD, times(1)).sparkContext();
-    verify(tuple3._1(), times(1)).runJob(eq(mockTuple2RDD), any(Function2.class), any(ClassTag.class));
-  }
-
-  @Test
-  @SuppressWarnings( "unchecked" )
-  public void testJavaRDDFunctions() throws Exception {
-    JavaRDD<String> mockJavaRDD = mock(JavaRDD.class);
-    RDD<String> mockRDD = mock(RDD.class);
-    when(mockJavaRDD.rdd()).thenReturn(mockRDD);
-    GeodeJavaRDDFunctions wrapper = javaFunctions(mockJavaRDD);
-    assertTrue(mockRDD == wrapper.rddf.rdd());
-
-    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
-    when(mockRDD.sparkContext()).thenReturn(tuple3._1());
-    PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class);
-    String regionPath = "testregion";
-    wrapper.saveToGeode(regionPath, mockPairFunc, tuple3._2());
-    verify(mockRDD, times(1)).sparkContext();
-    verify(tuple3._1(), times(1)).runJob(eq(mockRDD), any(Function2.class), any(ClassTag.class));
-  }
-
-  @Test
-  @SuppressWarnings( "unchecked" )
-  public void testJavaPairDStreamFunctions() throws Exception {
-    JavaPairDStream<String, String> mockJavaDStream = mock(JavaPairDStream.class);
-    DStream<Tuple2<String, String>> mockDStream = mock(DStream.class);
-    when(mockJavaDStream.dstream()).thenReturn(mockDStream);
-    GeodeJavaPairDStreamFunctions wrapper = javaFunctions(mockJavaDStream);
-    assertTrue(mockDStream == wrapper.dsf.dstream());
-
-    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
-    String regionPath = "testregion";
-    wrapper.saveToGeode(regionPath, tuple3._2());
-    verify(tuple3._2()).getConnection();
-    verify(tuple3._3()).validateRegion(regionPath);
-    verify(mockDStream).foreachRDD(any(Function1.class));
-  }
-
-  @Test
-  @SuppressWarnings( "unchecked" )
-  public void testJavaPairDStreamFunctionsWithTuple2DStream() throws Exception {
-    JavaDStream<Tuple2<String, String>> mockJavaDStream = mock(JavaDStream.class);
-    DStream<Tuple2<String, String>> mockDStream = mock(DStream.class);
-    when(mockJavaDStream.dstream()).thenReturn(mockDStream);
-    GeodeJavaPairDStreamFunctions wrapper = javaFunctions(toJavaPairDStream(mockJavaDStream));
-    assertTrue(mockDStream == wrapper.dsf.dstream());
-  }
-
-  @Test
-  @SuppressWarnings( "unchecked" )
-  public void testJavaDStreamFunctions() throws Exception {
-    JavaDStream<String> mockJavaDStream = mock(JavaDStream.class);
-    DStream<String> mockDStream = mock(DStream.class);
-    when(mockJavaDStream.dstream()).thenReturn(mockDStream);
-    GeodeJavaDStreamFunctions wrapper = javaFunctions(mockJavaDStream);
-    assertTrue(mockDStream == wrapper.dsf.dstream());
-
-    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
-    PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class);
-    String regionPath = "testregion";
-    wrapper.saveToGeode(regionPath, mockPairFunc, tuple3._2());
-    verify(tuple3._2()).getConnection();
-    verify(tuple3._3()).validateRegion(regionPath);
-    verify(mockDStream).foreachRDD(any(Function1.class));
-  }
-
-  @Test
-  public void testSQLContextFunction() throws Exception {
-    SQLContext mockSQLContext = mock(SQLContext.class);
-    GeodeJavaSQLContextFunctions wrapper = javaFunctions(mockSQLContext);
-    assertTrue(wrapper.scf.getClass() == GeodeSQLContextFunctions.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
deleted file mode 100644
index 4e45dc2..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-import org.apache.commons.httpclient.HttpClient
-import java.io.File
-
-
-class GeodeFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar {
-  val mockHttpClient: HttpClient = mock[HttpClient]
-    
-  test("jmx url creation") {
-    val jmxHostAndPort = "localhost:7070"
-    val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
-    val gfd = new GeodeFunctionDeployer(mockHttpClient);
-    val urlString = gfd.constructURLString(jmxHostAndPort)
-    assert(urlString === expectedUrlString)
-  }
-    
-  test("missing jar file") {
-    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
-    val gfd = new GeodeFunctionDeployer(mockHttpClient);
-    intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)}
-  }
-  
-  test("deploy with missing jar") {
-    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
-    val gfd = new GeodeFunctionDeployer(mockHttpClient);
-    intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))}
-    intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))}
-  }
-  
-  test("successful mocked deploy") {
-    val gfd = new GeodeFunctionDeployer(mockHttpClient);
-    val jar = new File("README.md");
-    assert(gfd.deploy("localhost:7070", jar).contains("Deployed"))
-  }
-  
-
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
deleted file mode 100644
index 798912c..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-
-class DefaultGeodeConnectionManagerTest extends FunSuite  with Matchers with MockitoSugar {
-
-  test("DefaultGeodeConnectionFactory get/closeConnection") {
-    // note: connConf 1-4 share the same set of locators
-    val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
-    val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
-    val connConf3 = new GeodeConnectionConf(Seq(("host1", 1234), ("host2", 5678)))
-    val connConf4 = new GeodeConnectionConf(Seq(("host2", 5678), ("host1", 1234)))
-    val connConf5 = new GeodeConnectionConf(Seq(("host5", 3333)))
-
-    val props: Map[String, String] = Map.empty
-    val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
-    val mockConn1 = mock[DefaultGeodeConnection]
-    val mockConn2 = mock[DefaultGeodeConnection]
-    when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1)
-    when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2)
-
-    assert(DefaultGeodeConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1)
-    // note: following 3 lines do not trigger connFactory.newConnection(...)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2)
-
-    // connFactory.newConnection(...) were invoked only twice
-    verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props)
-    verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props)
-    assert(DefaultGeodeConnectionManager.connections.size == 3)
-
-    DefaultGeodeConnectionManager.closeConnection(connConf1)
-    assert(DefaultGeodeConnectionManager.connections.size == 1)
-    DefaultGeodeConnectionManager.closeConnection(connConf5)
-    assert(DefaultGeodeConnectionManager.connections.isEmpty)
-  }
-  
-  test("DefaultGeodeConnectionFactory newConnection(...) throws RuntimeException") {
-    val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
-    val props: Map[String, String] = Map.empty
-    val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
-    when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException())
-    intercept[RuntimeException] { DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) }
-    verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props)
-  }
-
-  test("DefaultGeodeConnectionFactory close() w/ non-exist connection") {
-    val props: Map[String, String] = Map.empty
-    val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory]
-    val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234)))
-    val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678)))
-    val mockConn1 = mock[DefaultGeodeConnection]
-    when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1)
-    assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
-    assert(DefaultGeodeConnectionManager.connections.size == 1)
-    // connection does not exists in the connection manager
-    DefaultGeodeConnectionManager.closeConnection(connConf2)
-    assert(DefaultGeodeConnectionManager.connections.size == 1)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
deleted file mode 100644
index f2303e7..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions
-
-import com.gemstone.gemfire.DataSerializer
-import com.gemstone.gemfire.cache.execute.{ResultCollector, ResultSender}
-import com.gemstone.gemfire.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl}
-import com.gemstone.gemfire.cache.query.types.ObjectType
-import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput, HeapDataOutputStream}
-import com.gemstone.gemfire.internal.cache.{CachedDeserializable, CachedDeserializableFactory}
-import org.scalatest.{BeforeAndAfter, FunSuite}
-import scala.collection.JavaConversions._
-import scala.concurrent.{Await, ExecutionContext, Future}
-import ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-
-class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter  {
-
-  /** 
-    * A test ResultSender that connects struct ResultSender and ResultCollector 
-    * Note: this test ResultSender has to copy the data (byte array) since the
-    *       StructStreamingResultSender will reuse the byte array.
-    */
-  class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] {
-
-    var finishedNum = 0
-    
-    override def sendResult(result: Object): Unit =
-      collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
-
-    /** exception should be sent via lastResult() */
-    override def sendException(throwable: Throwable): Unit = 
-      throw new UnsupportedOperationException("sendException is not supported.")
-
-    override def lastResult(result: Object): Unit = {
-      collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
-      this.synchronized {
-        finishedNum += 1
-        if (finishedNum == num)
-          collector.endResults()
-      }
-    }
-  }
-
-  /** common variables */
-  var collector: StructStreamingResultCollector = _
-  var baseSender: LocalResultSender = _
-  /** common types */
-  val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType]
-  val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType))
-  val OneColType = new StructTypeImpl(Array("value"), Array(objType))
-
-  before {
-    collector = new StructStreamingResultCollector
-    baseSender = new LocalResultSender(collector, 1)
-  }
-  
-  test("transfer simple data") {
-    verifySimpleTransfer(sendDataType = true)
-  }
-
-  test("transfer simple data with no type info") {
-    verifySimpleTransfer(sendDataType = false)
-  }
-
-  def verifySimpleTransfer(sendDataType: Boolean): Unit = {
-    val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator
-    val dataType = if (sendDataType) TwoColType else null
-    new StructStreamingResultSender(baseSender, dataType , iter).send()
-    // println("type: " + collector.getResultType.toString)
-    assert(TwoColType.equals(collector.getResultType))
-    val iter2 = collector.getResult
-    (0 to 9).foreach { i =>
-      assert(iter2.hasNext)
-      val o = iter2.next()
-      assert(o.size == 2)
-      assert(o(0).asInstanceOf[Int] == i)
-      assert(o(1).asInstanceOf[String] == i.toString * 5)
-    }
-    assert(! iter2.hasNext)
-  }
-
-  
-  /**
-   * A test iterator that generate integer data
-   * @param start the 1st value
-   * @param n number of integers generated
-   * @param genExcp generate Exception if true. This is used to test exception handling.
-   */
-  def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = {
-    new Iterator[Array[Object]] {
-      val max = if (genExcp) start + n else start + n - 1
-      var index: Int = start - 1
-
-      override def hasNext: Boolean = if (index < max) true else false
-
-      override def next(): Array[Object] =
-        if (index < (start + n - 1)) {
-          index += 1
-          Array(index.asInstanceOf[Object])
-        } else throw new RuntimeException("simulated error")
-    }
-  }
-
-  test("transfer data with 0 row") {
-    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send()
-    // println("type: " + collector.getResultType.toString)
-    assert(collector.getResultType == null)
-    val iter = collector.getResult
-    assert(! iter.hasNext)
-  }
-
-  test("transfer data with 10K rows") {
-    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send()
-    // println("type: " + collector.getResultType.toString)
-    assert(OneColType.equals(collector.getResultType))
-    val iter = collector.getResult
-    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
-    (1 to 10000).foreach { i =>
-      assert(iter.hasNext)
-      val o = iter.next()
-      assert(o.size == 1)
-      assert(o(0).asInstanceOf[Int] == i)
-    }
-    assert(! iter.hasNext)
-  }
-
-  test("transfer data with 10K rows with 2 sender") {
-    baseSender = new LocalResultSender(collector, 2)
-    val total = 300
-    val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
-    val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()}
-    Await.result(sender1, 1.seconds)
-    Await.result(sender2, 1.seconds)
-
-    // println("type: " + collector.getResultType.toString)
-    assert(OneColType.equals(collector.getResultType))
-    val iter = collector.getResult
-    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
-    val set = scala.collection.mutable.Set[Int]()
-    (1 to total).foreach { i =>
-      assert(iter.hasNext)
-      val o = iter.next()
-      assert(o.size == 1)
-      assert(! set.contains(o(0).asInstanceOf[Int]))
-      set.add(o(0).asInstanceOf[Int])
-    }
-    assert(! iter.hasNext)
-  }
-
-  test("transfer data with 10K rows with 2 sender with error") {
-    baseSender = new LocalResultSender(collector, 2)
-    val total = 1000
-    val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
-    val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()}
-    Await.result(sender1, 1 seconds)
-    Await.result(sender2, 1 seconds)
-
-    // println("type: " + collector.getResultType.toString)
-    assert(OneColType.equals(collector.getResultType))
-    val iter = collector.getResult
-    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
-    val set = scala.collection.mutable.Set[Int]()
-    intercept[RuntimeException] {
-      (1 to total).foreach { i =>
-        assert(iter.hasNext)
-        val o = iter.next()
-        assert(o.size == 1)
-        assert(! set.contains(o(0).asInstanceOf[Int]))
-        set.add(o(0).asInstanceOf[Int])
-      }
-    }
-    // println(s"rows received: ${set.size}")
-  }
-
-  test("transfer data with Exception") {
-    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send()
-    // println("type: " + collector.getResultType.toString)
-    val iter = collector.getResult
-    intercept[RuntimeException] ( iter.foreach(_.mkString(",")) )
-  }
-
-  def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] =
-    intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}"))
-
-  test("transfer string pair data with 200 rows") {
-    new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send()
-    // println("type: " + collector.getResultType.toString)
-    assert(TwoColType.equals(collector.getResultType))
-    val iter = collector.getResult
-    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
-    (1 to 1000).foreach { i =>
-      assert(iter.hasNext)
-      val o = iter.next()
-      assert(o.size == 2)
-      assert(o(0) == s"key-$i")
-      assert(o(1) == s"value-$i")
-    }
-    assert(! iter.hasNext)
-  }
-
-  /**
-   * Usage notes: There are 3 kinds of data to transfer:
-   * (1) object, (2) byte array of serialized object, and (3) byte array
-   * this test shows how to handle all of them.
-   */
-  test("DataSerializer usage") {
-    val outBuf = new HeapDataOutputStream(1024, null)
-    val inBuf = new ByteArrayDataInput()
-
-    // 1. a regular object
-    val hello = "Hello World!" * 30
-    // serialize the data
-    DataSerializer.writeObject(hello, outBuf)
-    val bytesHello = outBuf.toByteArray.clone()
-    // de-serialize the data
-    inBuf.initialize(bytesHello, Version.CURRENT)
-    val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
-    assert(hello == hello2)
-    
-    // 2. byte array of serialized object
-    // serialize: byte array from `CachedDeserializable`
-    val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello)
-    outBuf.reset()
-    DataSerializer.writeByteArray(cd.getSerializedValue, outBuf)
-    // de-serialize the data in 2 steps
-    inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
-    val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf)
-    inBuf.initialize(bytesHello2, Version.CURRENT)
-    val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
-    assert(hello == hello3)
-
-    // 3. byte array
-    outBuf.reset()
-    DataSerializer.writeByteArray(bytesHello, outBuf)
-    inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
-    val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf)
-    assert(bytesHello sameElements bytesHello3)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
deleted file mode 100644
index 54394e8..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.oql
-
-import org.scalatest.FunSuite
-
-class QueryParserTest extends FunSuite {
-
-  test("select * from /r1") {
-    val r = QueryParser.parseOQL("select * from /r1").get
-    assert(r == "List(/r1)")
-  }
-
-  test("select c2 from /r1") {
-    val r = QueryParser.parseOQL("select c2 from /r1").get
-    assert(r == "List(/r1)")
-  }
-
-  test("select key, value from /r1.entries") {
-    val r = QueryParser.parseOQL("select key, value from /r1.entries").get
-    assert(r == "List(/r1.entries)")
-  }
-
-  test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") {
-    val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get
-    assert(r == "List(/r1)")
-  }
-
-  test("select * from /r1/r2 where c1 >= 200") {
-    val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get
-    assert(r == "List(/r1/r2)")
-  }
-
-  test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") {
-    val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get
-    assert(r == "List(/r1/r2, /r3/r4)")
-  }
-
-  test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") {
-    val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get
-    assert(r == "List(/r1/r2)")
-  }
-
-  test("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") {
-    val r = QueryParser.parseOQL("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get
-    assert(r == "List(/root/sub.entries)")
-  }
-
-  test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") {
-    val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get
-    assert(r == "List(/region)")
-  }
-
-  test("SELECT DISTINCT * FROM /QueryRegion1 r1,  /QueryRegion2 r2 WHERE r1.ID = r2.ID") {
-    val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1,  /QueryRegion2 r2 WHERE r1.ID = r2.ID").get
-    assert(r == "List(/QueryRegion1, /QueryRegion2)")
-  }
-
-  test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") {
-    val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get
-    println("r.type=" + r.getClass.getName + " r=" + r)
-    assert(r == "List(/obj_obj_region)")
-  }
-
-  test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") {
-    val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get
-    assert(r == "List(/obj_obj_region, r.positions.values)")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
deleted file mode 100644
index b0464cc..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector
-
-import io.pivotal.geode.spark.connector._
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.Matchers
-
-class ConnectorImplicitsTest extends FunSuite with Matchers with MockitoSugar {
-
-  test("implicit map2Properties") {
-    verifyProperties(Map.empty)
-    verifyProperties(Map("One" -> "1", "Two" -> "2", "Three" ->"3"))
-  }
-  
-  def verifyProperties(map: Map[String, String]): Unit = {
-    val props: java.util.Properties = map
-    assert(props.size() == map.size)
-    map.foreach(p => assert(props.getProperty(p._1) == p._2))    
-  }
-
-  test("Test Implicit SparkContext Conversion") {
-    val mockSparkContext = mock[SparkContext]
-    val gfscf: GeodeSparkContextFunctions = mockSparkContext
-    assert(gfscf.isInstanceOf[GeodeSparkContextFunctions])
-  }
-
-  test("Test Implicit SQLContext Conversion") {
-    val mockSQLContext = mock[SQLContext]
-    val gfscf: GeodeSQLContextFunctions = mockSQLContext
-    assert(gfscf.isInstanceOf[GeodeSQLContextFunctions])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
deleted file mode 100644
index a3076f4..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector
-
-import org.apache.spark.SparkConf
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-import io.pivotal.geode.spark.connector._
-
-class GeodeConnectionConfTest extends FunSuite with Matchers with MockitoSugar {
-
-  test("apply(SparkConf) w/ GeodeLocator property and empty geodeProps") {
-    val (host1, port1) = ("host1", 1234)
-    val (host2, port2) = ("host2", 5678)
-    val conf = new SparkConf().set(GeodeLocatorPropKey, s"$host1[$port1],$host2[$port2]")
-    val connConf = GeodeConnectionConf(conf)
-    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
-    assert(connConf.geodeProps.isEmpty)
-  }
-  
-  test("apply(SparkConf) w/ GeodeLocator property and geode properties") {
-    val (host1, port1) = ("host1", 1234)
-    val (host2, port2) = ("host2", 5678)
-    val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
-    val (propK2, propV2) = ("ack-wait-threshold", "10")
-    val conf = new SparkConf().set(GeodeLocatorPropKey, s"$host1[$port1],$host2[$port2]")
-                              .set(s"spark.geode.$propK1", propV1).set(s"spark.geode.$propK2", propV2)
-    val connConf = GeodeConnectionConf(conf)
-    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
-    assert(connConf.geodeProps == Map(propK1 -> propV1, propK2 -> propV2))
-  }
-
-  test("apply(SparkConf) w/o GeodeLocator property") {
-    intercept[RuntimeException] { GeodeConnectionConf(new SparkConf()) }
-  }
-
-  test("apply(SparkConf) w/ invalid GeodeLocator property") {
-    val conf = new SparkConf().set(GeodeLocatorPropKey, "local^host:1234")
-    intercept[Exception] { GeodeConnectionConf(conf) }
-  }
-
-  test("apply(locatorStr, geodeProps) w/ valid locatorStr and non geodeProps") {
-    val (host1, port1) = ("host1", 1234)
-    val connConf = GeodeConnectionConf(s"$host1:$port1")
-    assert(connConf.locators == Seq((host1, port1)))
-    assert(connConf.geodeProps.isEmpty)
-  }
-
-  test("apply(locatorStr, geodeProps) w/ valid locatorStr and non-empty geodeProps") {
-    val (host1, port1) = ("host1", 1234)
-    val (host2, port2) = ("host2", 5678)
-    val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
-    val (propK2, propV2) = ("ack-wait-threshold", "10")
-    val props = Map(propK1 -> propV1, propK2 -> propV2)
-    val connConf = GeodeConnectionConf(s"$host1:$port1,$host2:$port2", props)
-    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
-    assert(connConf.geodeProps == props)
-  }
-
-  test("apply(locatorStr, geodeProps) w/ invalid locatorStr") {
-    intercept[Exception] { GeodeConnectionConf("local~host:4321") }
-  }
-
-  test("constructor w/ empty (host,port) pairs") {
-    intercept[IllegalArgumentException] { new GeodeConnectionConf(Seq.empty) }
-  }
-
-  test("getConnection() normal") {
-    implicit val mockFactory = mock[GeodeConnectionManager]
-    val mockConnection = mock[GeodeConnection]
-    when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenReturn(mockConnection)
-    val connConf = GeodeConnectionConf("localhost:1234")
-    assert(connConf.getConnection == mockConnection)
-    verify(mockFactory).getConnection(connConf)
-  }
-
-  test("getConnection() failure") {
-    implicit val mockFactory = mock[GeodeConnectionManager]
-    when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenThrow(new RuntimeException)
-    val connConf = GeodeConnectionConf("localhost:1234")
-    intercept[RuntimeException] { connConf.getConnection }
-    verify(mockFactory).getConnection(connConf)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
deleted file mode 100644
index bcba7e1..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.DStream
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-import org.mockito.Matchers.{eq => mockEq, any => mockAny}
-
-import scala.reflect.ClassTag
-
-class GeodeDStreamFunctionsTest extends FunSuite with Matchers with MockitoSugar {
-
-  test("test GeodePairDStreamFunctions Implicit") {
-    import io.pivotal.geode.spark.connector.streaming._
-    val mockDStream = mock[DStream[(Int, String)]]
-    // the implicit make the following line valid
-    val pairDStream: GeodePairDStreamFunctions[Int, String] = mockDStream
-    pairDStream shouldBe a[GeodePairDStreamFunctions[_, _]]
-  }
-
-  test("test GeodeDStreamFunctions Implicit") {
-    import io.pivotal.geode.spark.connector.streaming._
-    val mockDStream = mock[DStream[String]]
-    // the implicit make the following line valid
-    val dstream: GeodeDStreamFunctions[String] = mockDStream
-    dstream shouldBe a[GeodeDStreamFunctions[_]]
-  }
-
-  def createMocks[K, V](regionPath: String)
-    (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]])
-    : (String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = {
-    val mockConnection = mock[GeodeConnection]
-    val mockConnConf = mock[GeodeConnectionConf]
-    val mockRegion = mock[Region[K, V]]
-    when(mockConnConf.getConnection).thenReturn(mockConnection)
-    when(mockConnConf.locators).thenReturn(Seq.empty)
-    (regionPath, mockConnConf, mockConnection, mockRegion)
-  }
-
-  test("test GeodePairDStreamFunctions.saveToGeode()") {
-    import io.pivotal.geode.spark.connector.streaming._
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test")
-    val mockDStream = mock[DStream[(String, String)]]
-    mockDStream.saveToGeode(regionPath, mockConnConf)
-    verify(mockConnConf).getConnection
-    verify(mockConnection).validateRegion[String, String](regionPath)
-    verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit])
-  }
-
-  test("test GeodeDStreamFunctions.saveToGeode()") {
-    import io.pivotal.geode.spark.connector.streaming._
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, Int]("test")
-    val mockDStream = mock[DStream[String]]
-    mockDStream.saveToGeode[String, Int](regionPath,  (s: String) => (s, s.length), mockConnConf)
-    verify(mockConnConf).getConnection
-    verify(mockConnection).validateRegion[String, String](regionPath)
-    verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit])
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
deleted file mode 100644
index 96e5f26..0000000
--- a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.geode.spark.connector._
-import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDWriter, GeodePairRDDWriter}
-import org.apache.spark.{TaskContext, SparkContext}
-import org.apache.spark.rdd.RDD
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-import collection.JavaConversions._
-import scala.reflect.ClassTag
-import org.mockito.Matchers.{eq => mockEq, any => mockAny}
-
-class GeodeRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar {
-
-  test("test PairRDDFunction Implicit") {
-    import io.pivotal.geode.spark.connector._
-    val mockRDD = mock[RDD[(Int, String)]]
-    // the implicit make the following line valid
-    val pairRDD: GeodePairRDDFunctions[Int, String] = mockRDD
-    pairRDD shouldBe a [GeodePairRDDFunctions[_, _]]
-  }
-  
-  test("test RDDFunction Implicit") {
-    import io.pivotal.geode.spark.connector._
-    val mockRDD = mock[RDD[String]]
-    // the implicit make the following line valid
-    val nonPairRDD: GeodeRDDFunctions[String] = mockRDD
-    nonPairRDD shouldBe a [GeodeRDDFunctions[_]]
-  }
-
-  def createMocks[K, V](regionPath: String)
-    (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]): (String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = {
-    val mockConnection = mock[GeodeConnection]
-    val mockConnConf = mock[GeodeConnectionConf]
-    val mockRegion = mock[Region[K, V]]
-    when(mockConnConf.getConnection).thenReturn(mockConnection)
-    when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion)
-    // mockRegion shouldEqual mockConn.getRegionProxy[K, V](regionPath)
-    (regionPath, mockConnConf, mockConnection, mockRegion)
-  }
-
-  test("test GeodePairRDDWriter") {
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test")
-    val writer = new GeodePairRDDWriter[String, String](regionPath, mockConnConf)
-    val data = List(("1", "one"), ("2", "two"), ("3", "three"))
-    writer.write(null, data.toIterator)
-    val expectedMap: Map[String, String] = data.toMap
-    verify(mockRegion).putAll(expectedMap)
-  }
-
-  test("test GeodeNonPairRDDWriter") {
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test")
-    val writer = new GeodeRDDWriter[String, Int, String](regionPath, mockConnConf)
-    val data = List("a", "ab", "abc")
-    val f: String => (Int, String) = s => (s.length, s)
-    writer.write(f)(null, data.toIterator)
-    val expectedMap: Map[Int, String] = data.map(f).toMap
-    verify(mockRegion).putAll(expectedMap)
-  }
-  
-  test("test PairRDDFunctions.saveToGeode") {
-    verifyPairRDDFunction(useOpConf = false)
-  }
-
-  test("test PairRDDFunctions.saveToGeode w/ opConf") {
-    verifyPairRDDFunction(useOpConf = true)
-  }
-  
-  def verifyPairRDDFunction(useOpConf: Boolean): Unit = {
-    import io.pivotal.geode.spark.connector._
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test")
-    val mockRDD = mock[RDD[(String, String)]]
-    val mockSparkContext = mock[SparkContext]
-    when(mockRDD.sparkContext).thenReturn(mockSparkContext)
-    val result = 
-      if (useOpConf) 
-        mockRDD.saveToGeode(regionPath, mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000"))
-      else
-        mockRDD.saveToGeode(regionPath, mockConnConf)
-    verify(mockConnection, times(1)).validateRegion[String, String](regionPath)
-    result === Unit
-    verify(mockSparkContext, times(1)).runJob[(String, String), Unit](
-      mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => Unit])(mockAny(classOf[ClassTag[Unit]]))
-
-    // Note: current implementation make following code not compilable
-    //       so not negative test for this case
-    //  val rdd: RDD[(K, V)] = ...
-    //  rdd.saveToGeode(regionPath, s => (s.length, s))
-  }
-
-  test("test RDDFunctions.saveToGeode") {
-    verifyRDDFunction(useOpConf = false)
-  }
-
-  test("test RDDFunctions.saveToGeode w/ opConf") {
-    verifyRDDFunction(useOpConf = true)
-  }
-  
-  def verifyRDDFunction(useOpConf: Boolean): Unit = {
-    import io.pivotal.geode.spark.connector._
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test")
-    val mockRDD = mock[RDD[(String)]]
-    val mockSparkContext = mock[SparkContext]
-    when(mockRDD.sparkContext).thenReturn(mockSparkContext)
-    val result = 
-      if (useOpConf)
-        mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000"))
-      else
-        mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf)
-    verify(mockConnection, times(1)).validateRegion[Int, String](regionPath)
-    result === Unit
-    verify(mockSparkContext, times(1)).runJob[String, Unit](
-      mockEq(mockRDD), mockAny[(TaskContext, Iterator[String]) => Unit])(mockAny(classOf[ClassTag[Unit]]))
-
-    // Note: current implementation make following code not compilable
-    //       so not negative test for this case
-    //  val rdd: RDD[T] = ...   // T is not a (K, V) tuple
-    //  rdd.saveToGeode(regionPath)
-  }
-  
-}