You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/21 19:16:49 UTC
[14/50] [abbrv] incubator-geode git commit: GEODE-1244: Package,
directory, project and file rename for geode-spark-connector
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
deleted file mode 100644
index aaf0fcc..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark
-
-import io.pivotal.gemfire.spark.connector.internal.rdd.{ServerSplitsPartitioner, OnePartitionPartitioner}
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-
-import scala.reflect.ClassTag
-
-/**
- * The root package of Gemfire connector for Apache Spark.
- * Provides handy implicit conversions that add gemfire-specific
- * methods to `SparkContext` and `RDD`.
- */
-package object connector {
-
- /** constants */
- final val GemFireLocatorPropKey = "spark.gemfire.locators"
- // partitioner related keys and values
- final val PreferredPartitionerPropKey = "preferred.partitioner"
- final val NumberPartitionsPerServerPropKey = "number.partitions.per.server"
- final val OnePartitionPartitionerName = OnePartitionPartitioner.name
- final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name
-
- final val RDDSaveBatchSizePropKey = "rdd.save.batch.size"
- final val RDDSaveBatchSizeDefault = 10000
-
- /** implicits */
-
- implicit def toSparkContextFunctions(sc: SparkContext): GemFireSparkContextFunctions =
- new GemFireSparkContextFunctions(sc)
-
- implicit def toSQLContextFunctions(sqlContext: SQLContext): GemFireSQLContextFunctions =
- new GemFireSQLContextFunctions(sqlContext)
-
- implicit def toGemfirePairRDDFunctions[K: ClassTag, V: ClassTag]
- (self: RDD[(K, V)]): GemFirePairRDDFunctions[K, V] = new GemFirePairRDDFunctions(self)
-
- implicit def toGemfireRDDFunctions[T: ClassTag]
- (self: RDD[T]): GemFireRDDFunctions[T] = new GemFireRDDFunctions(self)
-
- /** utility implicits */
-
- /** convert Map[String, String] to java.util.Properties */
- implicit def map2Properties(map: Map[String,String]): java.util.Properties =
- (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
-
- /** internal util methods */
-
- private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n "): String =
- rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p loc=${rdd.preferredLocations(p)}"}.mkString(sep)
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
deleted file mode 100644
index 6890ec0..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.streaming
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
-import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFirePairRDDWriter, GemFireRDDWriter}
-import org.apache.spark.Logging
-import org.apache.spark.api.java.function.PairFunction
-import org.apache.spark.streaming.dstream.DStream
-
-/**
- * Extra gemFire functions on DStream of non-pair elements through an implicit conversion.
- * Import `io.pivotal.gemfire.spark.connector.streaming._` at the top of your program to
- * use these functions.
- */
-class GemFireDStreamFunctions[T](val dstream: DStream[T]) extends Serializable with Logging {
-
- /**
- * Save the DStream of non-pair elements to GemFire key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the function that converts elements of the DStream to key/value pairs
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param opConf the optional parameters for this operation
- */
- def saveToGemfire[K, V](
- regionPath: String,
- func: T => (K, V),
- connConf: GemFireConnectionConf = defaultConnectionConf,
- opConf: Map[String, String] = Map.empty): Unit = {
- connConf.getConnection.validateRegion[K, V](regionPath)
- val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf)
- logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""")
- dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) _))
- }
-
- /** this version of saveToGemfire is just for Java API */
- def saveToGemfire[K, V](
- regionPath: String,
- func: PairFunction[T, K, V],
- connConf: GemFireConnectionConf,
- opConf: Map[String, String] ): Unit = {
- saveToGemfire[K, V](regionPath, func.call _, connConf, opConf)
- }
-
- private[connector] def defaultConnectionConf: GemFireConnectionConf =
- GemFireConnectionConf(dstream.context.sparkContext.getConf)
-}
-
-
-/**
- * Extra gemFire functions on DStream of (key, value) pairs through an implicit conversion.
- * Import `io.pivotal.gemfire.spark.connector.streaming._` at the top of your program to
- * use these functions.
- */
-class GemFirePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends Serializable with Logging {
-
- /**
- * Save the DStream of pairs to GemFire key-value store without any conversion
- * @param regionPath the full path of region that the DStream is stored
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param opConf the optional parameters for this operation
- */
- def saveToGemfire(
- regionPath: String,
- connConf: GemFireConnectionConf = defaultConnectionConf,
- opConf: Map[String, String] = Map.empty): Unit = {
- connConf.getConnection.validateRegion[K, V](regionPath)
- val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf)
- logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""")
- dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))
- }
-
- private[connector] def defaultConnectionConf: GemFireConnectionConf =
- GemFireConnectionConf(dstream.context.sparkContext.getConf)
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala
deleted file mode 100644
index b475cbb..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import org.apache.spark.streaming.dstream.DStream
-
-/**
- * Provides handy implicit conversions that add gemfire-specific methods to `DStream`.
- */
-package object streaming {
-
- implicit def toGemFireDStreamFunctions[T](ds: DStream[T]): GemFireDStreamFunctions[T] =
- new GemFireDStreamFunctions[T](ds)
-
- implicit def toGemFirePairDStreamFunctions[K, V](ds: DStream[(K, V)]): GemFirePairDStreamFunctions[K, V] =
- new GemFirePairDStreamFunctions[K, V](ds)
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java b/geode-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java
deleted file mode 100644
index 2236b4a..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector;
-
-import io.pivotal.gemfire.spark.connector.javaapi.*;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.SQLContext;
-//import org.apache.spark.sql.api.java.JavaSQLContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.dstream.DStream;
-import org.junit.Test;
-import org.scalatest.junit.JUnitSuite;
-import scala.Function1;
-import scala.Function2;
-import scala.Tuple2;
-import scala.Tuple3;
-import scala.collection.mutable.LinkedList;
-import scala.reflect.ClassTag;
-
-import static org.junit.Assert.*;
-import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
-
-public class JavaAPITest extends JUnitSuite {
-
- @SuppressWarnings( "unchecked" )
- public Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> createCommonMocks() {
- SparkContext mockSparkContext = mock(SparkContext.class);
- GemFireConnectionConf mockConnConf = mock(GemFireConnectionConf.class);
- GemFireConnection mockConnection = mock(GemFireConnection.class);
- when(mockConnConf.getConnection()).thenReturn(mockConnection);
- when(mockConnConf.locators()).thenReturn(new LinkedList());
- return new Tuple3<>(mockSparkContext, mockConnConf, mockConnection);
- }
-
- @Test
- public void testSparkContextFunction() throws Exception {
- Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks();
- GemFireJavaSparkContextFunctions wrapper = javaFunctions(tuple3._1());
- assertTrue(tuple3._1() == wrapper.sc);
- String regionPath = "testregion";
- JavaPairRDD<String, String> rdd = wrapper.gemfireRegion(regionPath, tuple3._2());
- verify(tuple3._3()).validateRegion(regionPath);
- }
-
- @Test
- public void testJavaSparkContextFunctions() throws Exception {
- SparkContext mockSparkContext = mock(SparkContext.class);
- JavaSparkContext mockJavaSparkContext = mock(JavaSparkContext.class);
- when(mockJavaSparkContext.sc()).thenReturn(mockSparkContext);
- GemFireJavaSparkContextFunctions wrapper = javaFunctions(mockJavaSparkContext);
- assertTrue(mockSparkContext == wrapper.sc);
- }
-
- @Test
- @SuppressWarnings( "unchecked" )
- public void testJavaPairRDDFunctions() throws Exception {
- JavaPairRDD<String, Integer> mockPairRDD = mock(JavaPairRDD.class);
- RDD<Tuple2<String, Integer>> mockTuple2RDD = mock(RDD.class);
- when(mockPairRDD.rdd()).thenReturn(mockTuple2RDD);
- GemFireJavaPairRDDFunctions wrapper = javaFunctions(mockPairRDD);
- assertTrue(mockTuple2RDD == wrapper.rddf.rdd());
-
- Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks();
- when(mockTuple2RDD.sparkContext()).thenReturn(tuple3._1());
- String regionPath = "testregion";
- wrapper.saveToGemfire(regionPath, tuple3._2());
- verify(mockTuple2RDD, times(1)).sparkContext();
- verify(tuple3._1(), times(1)).runJob(eq(mockTuple2RDD), any(Function2.class), any(ClassTag.class));
- }
-
- @Test
- @SuppressWarnings( "unchecked" )
- public void testJavaRDDFunctions() throws Exception {
- JavaRDD<String> mockJavaRDD = mock(JavaRDD.class);
- RDD<String> mockRDD = mock(RDD.class);
- when(mockJavaRDD.rdd()).thenReturn(mockRDD);
- GemFireJavaRDDFunctions wrapper = javaFunctions(mockJavaRDD);
- assertTrue(mockRDD == wrapper.rddf.rdd());
-
- Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks();
- when(mockRDD.sparkContext()).thenReturn(tuple3._1());
- PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class);
- String regionPath = "testregion";
- wrapper.saveToGemfire(regionPath, mockPairFunc, tuple3._2());
- verify(mockRDD, times(1)).sparkContext();
- verify(tuple3._1(), times(1)).runJob(eq(mockRDD), any(Function2.class), any(ClassTag.class));
- }
-
- @Test
- @SuppressWarnings( "unchecked" )
- public void testJavaPairDStreamFunctions() throws Exception {
- JavaPairDStream<String, String> mockJavaDStream = mock(JavaPairDStream.class);
- DStream<Tuple2<String, String>> mockDStream = mock(DStream.class);
- when(mockJavaDStream.dstream()).thenReturn(mockDStream);
- GemFireJavaPairDStreamFunctions wrapper = javaFunctions(mockJavaDStream);
- assertTrue(mockDStream == wrapper.dsf.dstream());
-
- Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks();
- String regionPath = "testregion";
- wrapper.saveToGemfire(regionPath, tuple3._2());
- verify(tuple3._2()).getConnection();
- verify(tuple3._3()).validateRegion(regionPath);
- verify(mockDStream).foreachRDD(any(Function1.class));
- }
-
- @Test
- @SuppressWarnings( "unchecked" )
- public void testJavaPairDStreamFunctionsWithTuple2DStream() throws Exception {
- JavaDStream<Tuple2<String, String>> mockJavaDStream = mock(JavaDStream.class);
- DStream<Tuple2<String, String>> mockDStream = mock(DStream.class);
- when(mockJavaDStream.dstream()).thenReturn(mockDStream);
- GemFireJavaPairDStreamFunctions wrapper = javaFunctions(toJavaPairDStream(mockJavaDStream));
- assertTrue(mockDStream == wrapper.dsf.dstream());
- }
-
- @Test
- @SuppressWarnings( "unchecked" )
- public void testJavaDStreamFunctions() throws Exception {
- JavaDStream<String> mockJavaDStream = mock(JavaDStream.class);
- DStream<String> mockDStream = mock(DStream.class);
- when(mockJavaDStream.dstream()).thenReturn(mockDStream);
- GemFireJavaDStreamFunctions wrapper = javaFunctions(mockJavaDStream);
- assertTrue(mockDStream == wrapper.dsf.dstream());
-
- Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks();
- PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class);
- String regionPath = "testregion";
- wrapper.saveToGemfire(regionPath, mockPairFunc, tuple3._2());
- verify(tuple3._2()).getConnection();
- verify(tuple3._3()).validateRegion(regionPath);
- verify(mockDStream).foreachRDD(any(Function1.class));
- }
-
- @Test
- public void testSQLContextFunction() throws Exception {
- SQLContext mockSQLContext = mock(SQLContext.class);
- GemFireJavaSQLContextFunctions wrapper = javaFunctions(mockSQLContext);
- assertTrue(wrapper.scf.getClass() == GemFireSQLContextFunctions.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala
deleted file mode 100644
index 854fd8f..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-import org.apache.commons.httpclient.HttpClient
-import java.io.File
-
-
-class GemFireFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar {
- val mockHttpClient: HttpClient = mock[HttpClient]
-
- test("jmx url creation") {
- val jmxHostAndPort = "localhost:7070"
- val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
- val gfd = new GemFireFunctionDeployer(mockHttpClient);
- val urlString = gfd.constructURLString(jmxHostAndPort)
- assert(urlString === expectedUrlString)
- }
-
- test("missing jar file") {
- val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
- val gfd = new GemFireFunctionDeployer(mockHttpClient);
- intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)}
- }
-
- test("deploy with missing jar") {
- val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
- val gfd = new GemFireFunctionDeployer(mockHttpClient);
- intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))}
- intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))}
- }
-
- test("successful mocked deploy") {
- val gfd = new GemFireFunctionDeployer(mockHttpClient);
- val jar = new File("README.md");
- assert(gfd.deploy("localhost:7070", jar).contains("Deployed"))
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
deleted file mode 100644
index 0ce9808..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-
-class DefaultGemFireConnectionManagerTest extends FunSuite with Matchers with MockitoSugar {
-
- test("DefaultGemFireConnectionFactory get/closeConnection") {
- // note: connConf 1-4 share the same set of locators
- val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234)))
- val connConf2 = new GemFireConnectionConf(Seq(("host2", 5678)))
- val connConf3 = new GemFireConnectionConf(Seq(("host1", 1234), ("host2", 5678)))
- val connConf4 = new GemFireConnectionConf(Seq(("host2", 5678), ("host1", 1234)))
- val connConf5 = new GemFireConnectionConf(Seq(("host5", 3333)))
-
- val props: Map[String, String] = Map.empty
- val mockConnFactory: DefaultGemFireConnectionFactory = mock[DefaultGemFireConnectionFactory]
- val mockConn1 = mock[DefaultGemFireConnection]
- val mockConn2 = mock[DefaultGemFireConnection]
- when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1)
- when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2)
-
- assert(DefaultGemFireConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1)
- // note: following 3 lines do not trigger connFactory.newConnection(...)
- assert(DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
- assert(DefaultGemFireConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1)
- assert(DefaultGemFireConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1)
- assert(DefaultGemFireConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2)
-
- // connFactory.newConnection(...) were invoked only twice
- verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props)
- verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props)
- assert(DefaultGemFireConnectionManager.connections.size == 3)
-
- DefaultGemFireConnectionManager.closeConnection(connConf1)
- assert(DefaultGemFireConnectionManager.connections.size == 1)
- DefaultGemFireConnectionManager.closeConnection(connConf5)
- assert(DefaultGemFireConnectionManager.connections.isEmpty)
- }
-
- test("DefaultGemFireConnectionFactory newConnection(...) throws RuntimeException") {
- val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234)))
- val props: Map[String, String] = Map.empty
- val mockConnFactory: DefaultGemFireConnectionFactory = mock[DefaultGemFireConnectionFactory]
- when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException())
- intercept[RuntimeException] { DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) }
- verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props)
- }
-
- test("DefaultGemFireConnectionFactory close() w/ non-exist connection") {
- val props: Map[String, String] = Map.empty
- val mockConnFactory: DefaultGemFireConnectionFactory = mock[DefaultGemFireConnectionFactory]
- val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234)))
- val connConf2 = new GemFireConnectionConf(Seq(("host2", 5678)))
- val mockConn1 = mock[DefaultGemFireConnection]
- when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1)
- assert(DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1)
- assert(DefaultGemFireConnectionManager.connections.size == 1)
- // connection does not exists in the connection manager
- DefaultGemFireConnectionManager.closeConnection(connConf2)
- assert(DefaultGemFireConnectionManager.connections.size == 1)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
deleted file mode 100644
index ad2b94e..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions
-
-import com.gemstone.gemfire.DataSerializer
-import com.gemstone.gemfire.cache.execute.{ResultCollector, ResultSender}
-import com.gemstone.gemfire.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl}
-import com.gemstone.gemfire.cache.query.types.ObjectType
-import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput, HeapDataOutputStream}
-import com.gemstone.gemfire.internal.cache.{CachedDeserializable, CachedDeserializableFactory}
-import org.scalatest.{BeforeAndAfter, FunSuite}
-import scala.collection.JavaConversions._
-import scala.concurrent.{Await, ExecutionContext, Future}
-import ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-
-class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter {
-
- /**
- * A test ResultSender that connects struct ResultSender and ResultCollector
- * Note: this test ResultSender has to copy the data (byte array) since the
- * StructStreamingResultSender will reuse the byte array.
- */
- class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] {
-
- var finishedNum = 0
-
- override def sendResult(result: Object): Unit =
- collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
-
- /** exception should be sent via lastResult() */
- override def sendException(throwable: Throwable): Unit =
- throw new UnsupportedOperationException("sendException is not supported.")
-
- override def lastResult(result: Object): Unit = {
- collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
- this.synchronized {
- finishedNum += 1
- if (finishedNum == num)
- collector.endResults()
- }
- }
- }
-
- /** common variables */
- var collector: StructStreamingResultCollector = _
- var baseSender: LocalResultSender = _
- /** common types */
- val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType]
- val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType))
- val OneColType = new StructTypeImpl(Array("value"), Array(objType))
-
- before {
- collector = new StructStreamingResultCollector
- baseSender = new LocalResultSender(collector, 1)
- }
-
- test("transfer simple data") {
- verifySimpleTransfer(sendDataType = true)
- }
-
- test("transfer simple data with no type info") {
- verifySimpleTransfer(sendDataType = false)
- }
-
- def verifySimpleTransfer(sendDataType: Boolean): Unit = {
- val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator
- val dataType = if (sendDataType) TwoColType else null
- new StructStreamingResultSender(baseSender, dataType , iter).send()
- // println("type: " + collector.getResultType.toString)
- assert(TwoColType.equals(collector.getResultType))
- val iter2 = collector.getResult
- (0 to 9).foreach { i =>
- assert(iter2.hasNext)
- val o = iter2.next()
- assert(o.size == 2)
- assert(o(0).asInstanceOf[Int] == i)
- assert(o(1).asInstanceOf[String] == i.toString * 5)
- }
- assert(! iter2.hasNext)
- }
-
-
- /**
- * A test iterator that generate integer data
- * @param start the 1st value
- * @param n number of integers generated
- * @param genExcp generate Exception if true. This is used to test exception handling.
- */
- def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = {
- new Iterator[Array[Object]] {
- val max = if (genExcp) start + n else start + n - 1
- var index: Int = start - 1
-
- override def hasNext: Boolean = if (index < max) true else false
-
- override def next(): Array[Object] =
- if (index < (start + n - 1)) {
- index += 1
- Array(index.asInstanceOf[Object])
- } else throw new RuntimeException("simulated error")
- }
- }
-
- test("transfer data with 0 row") {
- new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send()
- // println("type: " + collector.getResultType.toString)
- assert(collector.getResultType == null)
- val iter = collector.getResult
- assert(! iter.hasNext)
- }
-
- test("transfer data with 10K rows") {
- new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send()
- // println("type: " + collector.getResultType.toString)
- assert(OneColType.equals(collector.getResultType))
- val iter = collector.getResult
- // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
- (1 to 10000).foreach { i =>
- assert(iter.hasNext)
- val o = iter.next()
- assert(o.size == 1)
- assert(o(0).asInstanceOf[Int] == i)
- }
- assert(! iter.hasNext)
- }
-
- test("transfer data with 10K rows with 2 sender") {
- baseSender = new LocalResultSender(collector, 2)
- val total = 300
- val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
- val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()}
- Await.result(sender1, 1.seconds)
- Await.result(sender2, 1.seconds)
-
- // println("type: " + collector.getResultType.toString)
- assert(OneColType.equals(collector.getResultType))
- val iter = collector.getResult
- // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
- val set = scala.collection.mutable.Set[Int]()
- (1 to total).foreach { i =>
- assert(iter.hasNext)
- val o = iter.next()
- assert(o.size == 1)
- assert(! set.contains(o(0).asInstanceOf[Int]))
- set.add(o(0).asInstanceOf[Int])
- }
- assert(! iter.hasNext)
- }
-
- test("transfer data with 10K rows with 2 sender with error") {
- baseSender = new LocalResultSender(collector, 2)
- val total = 1000
- val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
- val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()}
- Await.result(sender1, 1 seconds)
- Await.result(sender2, 1 seconds)
-
- // println("type: " + collector.getResultType.toString)
- assert(OneColType.equals(collector.getResultType))
- val iter = collector.getResult
- // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
- val set = scala.collection.mutable.Set[Int]()
- intercept[RuntimeException] {
- (1 to total).foreach { i =>
- assert(iter.hasNext)
- val o = iter.next()
- assert(o.size == 1)
- assert(! set.contains(o(0).asInstanceOf[Int]))
- set.add(o(0).asInstanceOf[Int])
- }
- }
- // println(s"rows received: ${set.size}")
- }
-
- test("transfer data with Exception") {
- new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send()
- // println("type: " + collector.getResultType.toString)
- val iter = collector.getResult
- intercept[RuntimeException] ( iter.foreach(_.mkString(",")) )
- }
-
- def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] =
- intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}"))
-
- test("transfer string pair data with 200 rows") {
- new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send()
- // println("type: " + collector.getResultType.toString)
- assert(TwoColType.equals(collector.getResultType))
- val iter = collector.getResult
- // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
- (1 to 1000).foreach { i =>
- assert(iter.hasNext)
- val o = iter.next()
- assert(o.size == 2)
- assert(o(0) == s"key-$i")
- assert(o(1) == s"value-$i")
- }
- assert(! iter.hasNext)
- }
-
- /**
- * Usage notes: There are 3 kinds of data to transfer:
- * (1) object, (2) byte array of serialized object, and (3) byte array
- * this test shows how to handle all of them.
- */
- test("DataSerializer usage") {
- val outBuf = new HeapDataOutputStream(1024, null)
- val inBuf = new ByteArrayDataInput()
-
- // 1. a regular object
- val hello = "Hello World!" * 30
- // serialize the data
- DataSerializer.writeObject(hello, outBuf)
- val bytesHello = outBuf.toByteArray.clone()
- // de-serialize the data
- inBuf.initialize(bytesHello, Version.CURRENT)
- val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
- assert(hello == hello2)
-
- // 2. byte array of serialized object
- // serialize: byte array from `CachedDeserializable`
- val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello)
- outBuf.reset()
- DataSerializer.writeByteArray(cd.getSerializedValue, outBuf)
- // de-serialize the data in 2 steps
- inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
- val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf)
- inBuf.initialize(bytesHello2, Version.CURRENT)
- val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
- assert(hello == hello3)
-
- // 3. byte array
- outBuf.reset()
- DataSerializer.writeByteArray(bytesHello, outBuf)
- inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
- val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf)
- assert(bytesHello sameElements bytesHello3)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala
deleted file mode 100644
index e33e9e8..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.oql
-
-import org.scalatest.FunSuite
-
-class QueryParserTest extends FunSuite {
-
- test("select * from /r1") {
- val r = QueryParser.parseOQL("select * from /r1").get
- assert(r == "List(/r1)")
- }
-
- test("select c2 from /r1") {
- val r = QueryParser.parseOQL("select c2 from /r1").get
- assert(r == "List(/r1)")
- }
-
- test("select key, value from /r1.entries") {
- val r = QueryParser.parseOQL("select key, value from /r1.entries").get
- assert(r == "List(/r1.entries)")
- }
-
- test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") {
- val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get
- assert(r == "List(/r1)")
- }
-
- test("select * from /r1/r2 where c1 >= 200") {
- val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get
- assert(r == "List(/r1/r2)")
- }
-
- test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") {
- val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get
- assert(r == "List(/r1/r2, /r3/r4)")
- }
-
- test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") {
- val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get
- assert(r == "List(/r1/r2)")
- }
-
- test("IMPORT io.pivotal.gemfire IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") {
- val r = QueryParser.parseOQL("IMPORT io.pivotal.gemfire IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get
- assert(r == "List(/root/sub.entries)")
- }
-
- test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") {
- val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get
- assert(r == "List(/region)")
- }
-
- test("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID") {
- val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID").get
- assert(r == "List(/QueryRegion1, /QueryRegion2)")
- }
-
- test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") {
- val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get
- println("r.type=" + r.getClass.getName + " r=" + r)
- assert(r == "List(/obj_obj_region)")
- }
-
- test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") {
- val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get
- assert(r == "List(/obj_obj_region, r.positions.values)")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala
deleted file mode 100644
index 4032ee8..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.gemfire.spark.connector
-
-import io.pivotal.gemfire.spark.connector._
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.Matchers
-
-class ConnectorImplicitsTest extends FunSuite with Matchers with MockitoSugar {
-
- test("implicit map2Properties") {
- verifyProperties(Map.empty)
- verifyProperties(Map("One" -> "1", "Two" -> "2", "Three" ->"3"))
- }
-
- def verifyProperties(map: Map[String, String]): Unit = {
- val props: java.util.Properties = map
- assert(props.size() == map.size)
- map.foreach(p => assert(props.getProperty(p._1) == p._2))
- }
-
- test("Test Implicit SparkContext Conversion") {
- val mockSparkContext = mock[SparkContext]
- val gfscf: GemFireSparkContextFunctions = mockSparkContext
- assert(gfscf.isInstanceOf[GemFireSparkContextFunctions])
- }
-
- test("Test Implicit SQLContext Conversion") {
- val mockSQLContext = mock[SQLContext]
- val gfscf: GemFireSQLContextFunctions = mockSQLContext
- assert(gfscf.isInstanceOf[GemFireSQLContextFunctions])
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala
deleted file mode 100644
index 0e06db4..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.gemfire.spark.connector
-
-import org.apache.spark.SparkConf
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-import io.pivotal.gemfire.spark.connector._
-
-class GemFireConnectionConfTest extends FunSuite with Matchers with MockitoSugar {
-
- test("apply(SparkConf) w/ GemFireLocator property and empty gemfireProps") {
- val (host1, port1) = ("host1", 1234)
- val (host2, port2) = ("host2", 5678)
- val conf = new SparkConf().set(GemFireLocatorPropKey, s"$host1[$port1],$host2[$port2]")
- val connConf = GemFireConnectionConf(conf)
- assert(connConf.locators == Seq((host1, port1),(host2, port2)))
- assert(connConf.gemfireProps.isEmpty)
- }
-
- test("apply(SparkConf) w/ GemFireLocator property and gemfire properties") {
- val (host1, port1) = ("host1", 1234)
- val (host2, port2) = ("host2", 5678)
- val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
- val (propK2, propV2) = ("ack-wait-threshold", "10")
- val conf = new SparkConf().set(GemFireLocatorPropKey, s"$host1[$port1],$host2[$port2]")
- .set(s"spark.gemfire.$propK1", propV1).set(s"spark.gemfire.$propK2", propV2)
- val connConf = GemFireConnectionConf(conf)
- assert(connConf.locators == Seq((host1, port1),(host2, port2)))
- assert(connConf.gemfireProps == Map(propK1 -> propV1, propK2 -> propV2))
- }
-
- test("apply(SparkConf) w/o GemFireLocator property") {
- intercept[RuntimeException] { GemFireConnectionConf(new SparkConf()) }
- }
-
- test("apply(SparkConf) w/ invalid GemFireLocator property") {
- val conf = new SparkConf().set(GemFireLocatorPropKey, "local^host:1234")
- intercept[Exception] { GemFireConnectionConf(conf) }
- }
-
- test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non gemfireProps") {
- val (host1, port1) = ("host1", 1234)
- val connConf = GemFireConnectionConf(s"$host1:$port1")
- assert(connConf.locators == Seq((host1, port1)))
- assert(connConf.gemfireProps.isEmpty)
- }
-
- test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non-empty gemfireProps") {
- val (host1, port1) = ("host1", 1234)
- val (host2, port2) = ("host2", 5678)
- val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
- val (propK2, propV2) = ("ack-wait-threshold", "10")
- val props = Map(propK1 -> propV1, propK2 -> propV2)
- val connConf = GemFireConnectionConf(s"$host1:$port1,$host2:$port2", props)
- assert(connConf.locators == Seq((host1, port1),(host2, port2)))
- assert(connConf.gemfireProps == props)
- }
-
- test("apply(locatorStr, gemfireProps) w/ invalid locatorStr") {
- intercept[Exception] { GemFireConnectionConf("local~host:4321") }
- }
-
- test("constructor w/ empty (host,port) pairs") {
- intercept[IllegalArgumentException] { new GemFireConnectionConf(Seq.empty) }
- }
-
- test("getConnection() normal") {
- implicit val mockFactory = mock[GemFireConnectionManager]
- val mockConnection = mock[GemFireConnection]
- when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenReturn(mockConnection)
- val connConf = GemFireConnectionConf("localhost:1234")
- assert(connConf.getConnection == mockConnection)
- verify(mockFactory).getConnection(connConf)
- }
-
- test("getConnection() failure") {
- implicit val mockFactory = mock[GemFireConnectionManager]
- when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenThrow(new RuntimeException)
- val connConf = GemFireConnectionConf("localhost:1234")
- intercept[RuntimeException] { connConf.getConnection }
- verify(mockFactory).getConnection(connConf)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala
deleted file mode 100644
index 4117596..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.gemfire.spark.connector
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.gemfire.spark.connector.{GemFireConnection, GemFireConnectionConf}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.DStream
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-import org.mockito.Matchers.{eq => mockEq, any => mockAny}
-
-import scala.reflect.ClassTag
-
-class GemFireDStreamFunctionsTest extends FunSuite with Matchers with MockitoSugar {
-
- test("test GemFirePairDStreamFunctions Implicit") {
- import io.pivotal.gemfire.spark.connector.streaming._
- val mockDStream = mock[DStream[(Int, String)]]
- // the implicit make the following line valid
- val pairDStream: GemFirePairDStreamFunctions[Int, String] = mockDStream
- pairDStream shouldBe a[GemFirePairDStreamFunctions[_, _]]
- }
-
- test("test GemFireDStreamFunctions Implicit") {
- import io.pivotal.gemfire.spark.connector.streaming._
- val mockDStream = mock[DStream[String]]
- // the implicit make the following line valid
- val dstream: GemFireDStreamFunctions[String] = mockDStream
- dstream shouldBe a[GemFireDStreamFunctions[_]]
- }
-
- def createMocks[K, V](regionPath: String)
- (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]])
- : (String, GemFireConnectionConf, GemFireConnection, Region[K, V]) = {
- val mockConnection = mock[GemFireConnection]
- val mockConnConf = mock[GemFireConnectionConf]
- val mockRegion = mock[Region[K, V]]
- when(mockConnConf.getConnection).thenReturn(mockConnection)
- when(mockConnConf.locators).thenReturn(Seq.empty)
- (regionPath, mockConnConf, mockConnection, mockRegion)
- }
-
- test("test GemFirePairDStreamFunctions.saveToGemfire()") {
- import io.pivotal.gemfire.spark.connector.streaming._
- val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test")
- val mockDStream = mock[DStream[(String, String)]]
- mockDStream.saveToGemfire(regionPath, mockConnConf)
- verify(mockConnConf).getConnection
- verify(mockConnection).validateRegion[String, String](regionPath)
- verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit])
- }
-
- test("test GemFireDStreamFunctions.saveToGemfire()") {
- import io.pivotal.gemfire.spark.connector.streaming._
- val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, Int]("test")
- val mockDStream = mock[DStream[String]]
- mockDStream.saveToGemfire[String, Int](regionPath, (s: String) => (s, s.length), mockConnConf)
- verify(mockConnConf).getConnection
- verify(mockConnection).validateRegion[String, String](regionPath)
- verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit])
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
deleted file mode 100644
index f2d49cb..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.gemfire.spark.connector
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.gemfire.spark.connector._
-import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDWriter, GemFirePairRDDWriter}
-import org.apache.spark.{TaskContext, SparkContext}
-import org.apache.spark.rdd.RDD
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-import collection.JavaConversions._
-import scala.reflect.ClassTag
-import org.mockito.Matchers.{eq => mockEq, any => mockAny}
-
-class GemFireRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar {
-
- test("test PairRDDFunction Implicit") {
- import io.pivotal.gemfire.spark.connector._
- val mockRDD = mock[RDD[(Int, String)]]
- // the implicit make the following line valid
- val pairRDD: GemFirePairRDDFunctions[Int, String] = mockRDD
- pairRDD shouldBe a [GemFirePairRDDFunctions[_, _]]
- }
-
- test("test RDDFunction Implicit") {
- import io.pivotal.gemfire.spark.connector._
- val mockRDD = mock[RDD[String]]
- // the implicit make the following line valid
- val nonPairRDD: GemFireRDDFunctions[String] = mockRDD
- nonPairRDD shouldBe a [GemFireRDDFunctions[_]]
- }
-
- def createMocks[K, V](regionPath: String)
- (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]): (String, GemFireConnectionConf, GemFireConnection, Region[K, V]) = {
- val mockConnection = mock[GemFireConnection]
- val mockConnConf = mock[GemFireConnectionConf]
- val mockRegion = mock[Region[K, V]]
- when(mockConnConf.getConnection).thenReturn(mockConnection)
- when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion)
- // mockRegion shouldEqual mockConn.getRegionProxy[K, V](regionPath)
- (regionPath, mockConnConf, mockConnection, mockRegion)
- }
-
- test("test GemFirePairRDDWriter") {
- val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test")
- val writer = new GemFirePairRDDWriter[String, String](regionPath, mockConnConf)
- val data = List(("1", "one"), ("2", "two"), ("3", "three"))
- writer.write(null, data.toIterator)
- val expectedMap: Map[String, String] = data.toMap
- verify(mockRegion).putAll(expectedMap)
- }
-
- test("test GemFireNonPairRDDWriter") {
- val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test")
- val writer = new GemFireRDDWriter[String, Int, String](regionPath, mockConnConf)
- val data = List("a", "ab", "abc")
- val f: String => (Int, String) = s => (s.length, s)
- writer.write(f)(null, data.toIterator)
- val expectedMap: Map[Int, String] = data.map(f).toMap
- verify(mockRegion).putAll(expectedMap)
- }
-
- test("test PairRDDFunctions.saveToGemfire") {
- verifyPairRDDFunction(useOpConf = false)
- }
-
- test("test PairRDDFunctions.saveToGemfire w/ opConf") {
- verifyPairRDDFunction(useOpConf = true)
- }
-
- def verifyPairRDDFunction(useOpConf: Boolean): Unit = {
- import io.pivotal.gemfire.spark.connector._
- val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test")
- val mockRDD = mock[RDD[(String, String)]]
- val mockSparkContext = mock[SparkContext]
- when(mockRDD.sparkContext).thenReturn(mockSparkContext)
- val result =
- if (useOpConf)
- mockRDD.saveToGemfire(regionPath, mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000"))
- else
- mockRDD.saveToGemfire(regionPath, mockConnConf)
- verify(mockConnection, times(1)).validateRegion[String, String](regionPath)
- result === Unit
- verify(mockSparkContext, times(1)).runJob[(String, String), Unit](
- mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => Unit])(mockAny(classOf[ClassTag[Unit]]))
-
- // Note: current implementation make following code not compilable
- // so not negative test for this case
- // val rdd: RDD[(K, V)] = ...
- // rdd.saveToGemfire(regionPath, s => (s.length, s))
- }
-
- test("test RDDFunctions.saveToGemfire") {
- verifyRDDFunction(useOpConf = false)
- }
-
- test("test RDDFunctions.saveToGemfire w/ opConf") {
- verifyRDDFunction(useOpConf = true)
- }
-
- def verifyRDDFunction(useOpConf: Boolean): Unit = {
- import io.pivotal.gemfire.spark.connector._
- val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test")
- val mockRDD = mock[RDD[(String)]]
- val mockSparkContext = mock[SparkContext]
- when(mockRDD.sparkContext).thenReturn(mockSparkContext)
- val result =
- if (useOpConf)
- mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000"))
- else
- mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf)
- verify(mockConnection, times(1)).validateRegion[Int, String](regionPath)
- result === Unit
- verify(mockSparkContext, times(1)).runJob[String, Unit](
- mockEq(mockRDD), mockAny[(TaskContext, Iterator[String]) => Unit])(mockAny(classOf[ClassTag[Unit]]))
-
- // Note: current implementation make following code not compilable
- // so not negative test for this case
- // val rdd: RDD[T] = ... // T is not a (K, V) tuple
- // rdd.saveToGemfire(regionPath)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
deleted file mode 100644
index bfb115a..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.gemfire.spark.connector
-
-import java.net.InetAddress
-
-import io.pivotal.gemfire.spark.connector.internal.LocatorHelper
-import org.scalatest.FunSuite
-
-class LocatorHelperTest extends FunSuite {
-
- test("locatorStr2HostPortPair hostname w/o domain") {
- val (host, port) = ("localhost", 10334)
- assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port))
- assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port))
- }
-
- test("locatorStr2HostPortPair hostname w/ domain") {
- val (host, port) = ("localhost", 10334)
- assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port))
- assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port))
- }
-
- test("locatorStr2HostPortPair w/ invalid host name") {
- // empty or null locatorStr
- assert(LocatorHelper.locatorStr2HostPortPair("").isFailure)
- assert(LocatorHelper.locatorStr2HostPortPair(null).isFailure)
- // host name has leading `.`
- assert(LocatorHelper.locatorStr2HostPortPair(".localhost.1234").isFailure)
- // host name has leading and/or tail white space
- assert(LocatorHelper.locatorStr2HostPortPair(" localhost.1234").isFailure)
- assert(LocatorHelper.locatorStr2HostPortPair("localhost .1234").isFailure)
- assert(LocatorHelper.locatorStr2HostPortPair(" localhost .1234").isFailure)
- // host name contain invalid characters
- assert(LocatorHelper.locatorStr2HostPortPair("local%host.1234").isFailure)
- assert(LocatorHelper.locatorStr2HostPortPair("localhost*.1234").isFailure)
- assert(LocatorHelper.locatorStr2HostPortPair("^localhost.1234").isFailure)
- }
-
- test("locatorStr2HostPortPair w/ valid port") {
- val host = "192.168.0.1"
- // port has 2, 3, 4, 5 digits
- assert(LocatorHelper.locatorStr2HostPortPair(s"$host:20").get ==(host, 20))
- assert(LocatorHelper.locatorStr2HostPortPair(s"$host:300").get ==(host, 300))
- assert(LocatorHelper.locatorStr2HostPortPair(s"$host:4000").get ==(host, 4000))
- assert(LocatorHelper.locatorStr2HostPortPair(s"$host:50000").get ==(host, 50000))
- }
-
- test("locatorStr2HostPortPair w/ invalid port") {
- // port number is less than 2 digits
- assert(LocatorHelper.locatorStr2HostPortPair("locslhost.9").isFailure)
- // port number is more than 5 digits
- assert(LocatorHelper.locatorStr2HostPortPair("locslhost.100000").isFailure)
- // port number is invalid
- assert(LocatorHelper.locatorStr2HostPortPair("locslhost.1xx1").isFailure)
- }
-
- test("parseLocatorsString with valid locator(s)") {
- val (host1, port1) = ("localhost", 10334)
- assert(LocatorHelper.parseLocatorsString(s"$host1:$port1") == Seq((host1, port1)))
- val (host2, port2) = ("localhost2", 10335)
- assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2") == Seq((host1, port1),(host2, port2)))
- val (host3, port3) = ("localhost2", 10336)
- assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2,$host3:$port3") ==
- Seq((host1, port1),(host2, port2),(host3, port3)))
- }
-
- test("parseLocatorsString with invalid locator(s)") {
- // empty and null locatorsStr
- intercept[Exception] { LocatorHelper.parseLocatorsString("") }
- intercept[Exception] { LocatorHelper.parseLocatorsString(null) }
- // 1 bad locatorStr
- intercept[Exception] { LocatorHelper.parseLocatorsString("local%host.1234") }
- // 1 good locatorStr and 1 bad locatorStr
- intercept[Exception] { LocatorHelper.parseLocatorsString("localhost:2345,local%host.1234") }
- intercept[Exception] { LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") }
- }
-
- test("pickPreferredGemFireServers: shared servers and one gf-server per host") {
- val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
- val servers = Seq(srv1, srv2, srv3, srv4)
- verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
- verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4))
- verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1))
- verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2))
- }
-
- test("pickPreferredGemFireServers: shared servers, one gf-server per host, un-sorted list") {
- val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
- val servers = Seq(srv4, srv2, srv3, srv1)
- verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
- verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4))
- verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1))
- verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2))
- }
-
- test("pickPreferredGemFireServers: shared servers and two gf-server per host") {
- val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004))
- val servers = Seq(srv1, srv2, srv3, srv4)
- verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
- verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3))
- verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1))
- verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1))
- }
-
- test("pickPreferredGemFireServers: shared servers, two gf-server per host, un-sorted server list") {
- val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004))
- val servers = Seq(srv1, srv4, srv3, srv2)
- verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3))
- verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3))
- verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1))
- verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1))
- }
-
- test("pickPreferredGemFireServers: no shared servers and one gf-server per host") {
- val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004))
- val servers = Seq(srv1, srv2, srv3, srv4)
- verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2, srv3))
- verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv3, srv4))
- verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv3, srv4, srv1))
- verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv4, srv1, srv2))
- }
-
- test("pickPreferredGemFireServers: no shared servers, one gf-server per host, and less gf-server") {
- val (srv1, srv2) = (("host1", 4001), ("host2", 4002))
- val servers = Seq(srv1, srv2)
- verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2))
- verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv1))
- verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv1, srv2))
- verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv2, srv1))
-
-
- println("host name: " + InetAddress.getLocalHost.getHostName)
- println("canonical host name: " + InetAddress.getLocalHost.getCanonicalHostName)
- println("canonical host name 2: " + InetAddress.getByName(InetAddress.getLocalHost.getHostName).getCanonicalHostName)
- }
-
- test("pickPreferredGemFireServers: ad-hoc") {
- val (srv4, srv5, srv6) = (
- ("w2-gst-pnq-04.gemstone.com", 40411), ("w2-gst-pnq-05.gemstone.com", 40411), ("w2-gst-pnq-06.gemstone.com", 40411))
- val servers = Seq(srv6, srv5, srv4)
- verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-03.gemstone.com", "<driver>", Seq(srv4, srv5, srv6))
- verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-04.gemstone.com", "1", Seq(srv4, srv5, srv6))
- verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-05.gemstone.com", "0", Seq(srv5, srv6, srv4))
- verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-06.gemstone.com", "2", Seq(srv6, srv4, srv5))
- }
-
- def verifyPickPreferredGemFireServers(
- servers: Seq[(String, Int)], hostName: String, executorId: String, expectation: Seq[(String, Int)]): Unit = {
- val result = LocatorHelper.pickPreferredGemFireServers(servers, hostName, executorId)
- assert(result == expectation, s"pick servers for $hostName:$executorId")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala
deleted file mode 100644
index f6a30c7..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.gemfire.spark.connector.rdd
-
-import com.gemstone.gemfire.distributed.internal.ServerLocation
-import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._
-import io.pivotal.gemfire.spark.connector.GemFireConnection
-import io.pivotal.gemfire.spark.connector.internal.rdd._
-import org.apache.spark.Partition
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-
-import java.util.{HashSet => JHashSet, HashMap => JHashMap}
-
-import scala.collection.mutable
-
-class GemFireRDDPartitionerTest extends FunSuite with Matchers with MockitoSugar {
-
- val emptyServerBucketMap: JHashMap[ServerLocation, JHashSet[Integer]] = new JHashMap()
-
- def toJavaServerBucketMap(map: Map[(String, Int), Set[Int]]): JHashMap[ServerLocation, JHashSet[Integer]] = {
- import scala.collection.JavaConversions._
- val tmp = map.map {case ((host, port), set) => (new ServerLocation(host, port), set.map(Integer.valueOf))}
- (new JHashMap[ServerLocation, JHashSet[Integer]]() /: tmp) { case (acc, (s, jset)) => acc.put(s, new JHashSet(jset)); acc }
- }
-
- val map: mutable.Map[(String, Int), mutable.Set[Int]] = mutable.Map(
- ("s0",1) -> mutable.Set.empty, ("s1",2) -> mutable.Set(0), ("s2",3) -> mutable.Set(1, 2), ("s3",4) -> mutable.Set(3, 4, 5))
-
-
- // update this test whenever change default setting
- test("default partitioned region partitioner") {
- assert(GemFireRDDPartitioner.defaultPartitionedRegionPartitioner === ServerSplitsPartitioner)
- }
-
- // update this test whenever change default setting
- test("default replicated region partitioner") {
- assert(GemFireRDDPartitioner.defaultReplicatedRegionPartitioner === OnePartitionPartitioner)
- }
-
- test("GemFireRDDPartitioner.apply method") {
- import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._
- for ((name, partitioner) <- partitioners) assert(GemFireRDDPartitioner(name) == partitioner)
- assert(GemFireRDDPartitioner("dummy") == GemFireRDDPartitioner.defaultPartitionedRegionPartitioner)
- assert(GemFireRDDPartitioner() == GemFireRDDPartitioner.defaultPartitionedRegionPartitioner)
- }
-
- test("OnePartitionPartitioner") {
- val mockConnection = mock[GemFireConnection]
- val partitions = OnePartitionPartitioner.partitions[String, String](mockConnection, null, Map.empty)
- verifySinglePartition(partitions)
- }
-
- def verifySinglePartition(partitions: Array[Partition]): Unit = {
- assert(1 == partitions.size)
- assert(partitions(0).index === 0)
- assert(partitions(0).isInstanceOf[GemFireRDDPartition])
- assert(partitions(0).asInstanceOf[GemFireRDDPartition].bucketSet.isEmpty)
- }
-
- test("ServerSplitsPartitioner.doPartitions(): n=1 & no empty bucket") {
- val map: List[(String, mutable.Set[Int])] = List(
- "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
- val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 1)
- verifyPartitions(partitions, List(
- (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5), Seq("server2"))))
- }
-
- test("ServerSplitsPartitioner.doPartitions(): n=1 & 1 empty bucket") {
- val map: List[(String, mutable.Set[Int])] = List(
- "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
- val partitions = ServerSplitsPartitioner.doPartitions(map, 7, 1)
- verifyPartitions(partitions, List(
- (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5), Seq("server2"))))
- }
-
- test("ServerSplitsPartitioner.doPartitions(): n=1 & 2 empty bucket") {
- val map: List[(String, mutable.Set[Int])] = List(
- "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
- val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
- verifyPartitions(partitions, List(
- (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5, 7), Seq("server2"))))
- }
-
- test("ServerSplitsPartitioner.doPartitions(): n=1 & 5 empty bucket") {
- val map: List[(String, mutable.Set[Int])] = List(
- "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
- val partitions = ServerSplitsPartitioner.doPartitions(map, 11, 1)
- verifyPartitions(partitions, List(
- (Set(0, 1, 2, 3, 6, 7, 8), Seq("server1")), (Set(4, 5, 9, 10), Seq("server2"))))
- }
-
- test("ServerSplitsPartitioner.doPartitions(): n=1, 4 empty-bucket, non-continuous IDs") {
- val map: List[(String, mutable.Set[Int])] = List(
- "server1" -> mutable.Set(1, 3), "server2" -> mutable.Set(5,7))
- val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
- verifyPartitions(partitions, List(
- (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5, 6, 7), Seq("server2"))))
- }
-
- test("ServerSplitsPartitioner.doPartitions(): n=2, no empty buckets, 3 servers have 1, 2, and 3 buckets") {
- val map: List[(String, mutable.Set[Int])] = List(
- "s1" -> mutable.Set(0), "s2" -> mutable.Set(1, 2), "s3" -> mutable.Set(3, 4, 5))
- val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 2)
- // partitions.foreach(println)
- verifyPartitions(partitions, List(
- (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3"))))
- }
-
- test("ServerSplitsPartitioner.doPartitions(): n=3, no empty buckets, 4 servers have 0, 2, 3, and 4 buckets") {
- val map: List[(String, mutable.Set[Int])] = List(
- "s0" -> mutable.Set.empty, "s1" -> mutable.Set(0, 1), "s2" -> mutable.Set(2, 3, 4), "s3" -> mutable.Set(5, 6, 7, 8))
- val partitions = ServerSplitsPartitioner.doPartitions(map, 9, 3)
- // partitions.foreach(println)
- verifyPartitions(partitions, List(
- (Set(0), Seq("s1")), (Set(1), Seq("s1")), (Set(2), Seq("s2")), (Set(3), Seq("s2")), (Set(4), Seq("s2")),
- (Set(5, 6), Seq("s3")), (Set(7, 8), Seq("s3")) ))
- }
-
- test("ServerSplitsPartitioner.partitions(): metadata = None ") {
- val regionPath = "test"
- val mockConnection = mock[GemFireConnection]
- intercept[RuntimeException] { ServerSplitsPartitioner.partitions[String, String](mockConnection, null, Map.empty) }
- }
-
- test("ServerSplitsPartitioner.partitions(): replicated region ") {
- val regionPath = "test"
- val mockConnection = mock[GemFireConnection]
- val md = new RegionMetadata(regionPath, false, 11, null)
- when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
- val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty)
- verifySinglePartition(partitions)
- }
-
- test("ServerSplitsPartitioner.partitions(): partitioned region w/o data ") {
- val regionPath = "test"
- val mockConnection = mock[GemFireConnection]
- val md = new RegionMetadata(regionPath, true, 6, emptyServerBucketMap)
- when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
- val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty)
- verifySinglePartition(partitions)
- }
-
- test("ServerSplitsPartitioner.partitions(): partitioned region w/ some data ") {
- import io.pivotal.gemfire.spark.connector.NumberPartitionsPerServerPropKey
- val regionPath = "test"
- val mockConnection = mock[GemFireConnection]
- val map: Map[(String, Int), Set[Int]] = Map(
- ("s0",1) -> Set.empty, ("s1",2) -> Set(0), ("s2",3) -> Set(1, 2), ("s3",4) -> Set(3, 4, 5))
- val md = new RegionMetadata(regionPath, true, 6, toJavaServerBucketMap(map))
- when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
- val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map(NumberPartitionsPerServerPropKey->"2"))
- // partitions.foreach(println)
- verifyPartitions(partitions, List(
- (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3"))))
- }
-
- // Note: since the order of partitions is not pre-determined, we have to verify partition id
- // and contents separately
- def verifyPartitions(partitions: Array[Partition], expPartitions: List[(Set[Int], Seq[String])]): Unit = {
- // 1. check size
- assert(partitions.size == expPartitions.size)
- // 2. check IDs are 0 to n-1
- (0 until partitions.size).toList.zip(partitions).foreach { case (id, p) => assert(id == p.index) }
-
- // 3. get all pairs of bucket set and its locations, and compare to the expected pairs
- val list = partitions.map { e =>
- val p = e.asInstanceOf[GemFireRDDPartition]
- (p.bucketSet, p.locations)
- }
- expPartitions.foreach(e => assert(list.contains(e)))
- }
-
-}