You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/21 19:17:22 UTC
[47/50] [abbrv] incubator-geode git commit: GEODE-1244: Package,
directory, project and file rename for geode-spark-connector
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireRunner.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireRunner.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireRunner.scala
deleted file mode 100644
index b4ee572..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireRunner.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.gemfire.spark.connector.testkit
-
-import java.io.{IOException, File}
-import java.net.InetAddress
-import java.util.Properties
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.io.FileUtils
-import org.apache.commons.io.filefilter.IOFileFilter
-
-/**
-* A class that manages GemFire locator and servers. Uses gfsh to
-* start and stop the locator and servers.
-*/
-class GemFireRunner(settings: Properties) {
- val gfshCmd = new File(getCurrentDirectory, "../../geode-assembly/build/install/apache-geode/bin/gfsh").toString
- val cacheXMLFile = settings.get("cache-xml-file")
- val numServers: Int = settings.get("num-of-servers").asInstanceOf[String].toInt
- val cwd = new File(".").getAbsolutePath
- val gemfireFunctionsTargetDir = new File("../gemfire-functions/target")
- val testroot = "target/testgemfire"
- val classpath = new File(cwd, "target/scala-2.10/it-classes/")
- val locatorPort = startGemFireCluster(numServers)
-
- def getLocatorPort: Int = locatorPort
-
- private def getCurrentDirectory = new File( "." ).getCanonicalPath
-
- private def startGemFireCluster(numServers: Int): Int = {
- //ports(0) for GemFire locator, the other ports are for GemFire servers
- val ports: Seq[Int] = IOUtils.getRandomAvailableTCPPorts(2 + numServers)
- startGemFireLocator(ports(0), ports(1))
- startGemFireServers(ports(0), ports.drop(2))
- registerFunctions(ports(1))
- ports(0)
- }
-
- private def startGemFireLocator(locatorPort: Int, jmxHttpPort:Int) {
- println(s"=== GemFireRunner: starting locator on port $locatorPort")
- val locatorDir = new File(cwd, s"$testroot/locator")
- if (locatorDir.exists())
- FileUtils.deleteDirectory(locatorDir)
- IOUtils.mkdir(locatorDir)
- new ProcessBuilder()
- .command(gfshCmd, "start", "locator",
- "--name=locator",
- s"--dir=$locatorDir",
- s"--port=$locatorPort",
- s"--J=-Dgemfire.jmx-manager-http-port=$jmxHttpPort")
- .inheritIO()
- .start()
-
- // Wait 30 seconds for locator to start
- println(s"=== GemFireRunner: waiting for locator on port $locatorPort")
- if (!IOUtils.waitForPortOpen(InetAddress.getByName("localhost"), locatorPort, 30000))
- throw new IOException("Failed to start GemFire locator.")
- println(s"=== GemFireRunner: done waiting for locator on port $locatorPort")
- }
-
- private def startGemFireServers(locatorPort: Int, serverPorts: Seq[Int]) {
- val procs = for (i <- 0 until serverPorts.length) yield {
- println(s"=== GemFireRunner: starting server${i+1} with clientPort ${serverPorts(i)}")
- val serverDir = new File(cwd, s"$testroot/server${i+1}")
- if (serverDir.exists())
- FileUtils.deleteDirectory(serverDir)
- IOUtils.mkdir(serverDir)
- new ProcessBuilder()
- .command(gfshCmd, "start", "server",
- s"--name=server${i+1}",
- s"--locators=localhost[$locatorPort]",
- s"--bind-address=localhost",
- s"--server-port=${serverPorts(i)}",
- s"--dir=$serverDir",
- s"--cache-xml-file=$cacheXMLFile",
- s"--classpath=$classpath")
- .inheritIO()
- .start()
- }
- procs.foreach(p => p.waitFor)
- println(s"All $serverPorts.length servers have been started")
- }
-
- private def registerFunctions(jmxHttpPort:Int) {
- import scala.collection.JavaConversions._
- FileUtils.listFiles(gemfireFunctionsTargetDir, fileFilter, dirFilter).foreach{ f => registerFunction(jmxHttpPort, f)}
- }
-
- def fileFilter = new IOFileFilter {
- def accept (file: File) = file.getName.endsWith(".jar") && file.getName.startsWith("gemfire-functions")
- def accept (dir: File, name: String) = name.endsWith(".jar") && name.startsWith("gemfire-functions")
- }
-
- def dirFilter = new IOFileFilter {
- def accept (file: File) = file.getName.startsWith("scala")
- def accept (dir: File, name: String) = name.startsWith("scala")
- }
-
- private def registerFunction(jmxHttpPort:Int, jar:File) {
- println("Deploying:" + jar.getName)
- import io.pivotal.gemfire.spark.connector.GemFireFunctionDeployer
- val deployer = new GemFireFunctionDeployer(new HttpClient())
- deployer.deploy("localhost", jmxHttpPort, jar)
- }
-
- def stopGemFireCluster(): Unit = {
- stopGemFireServers(numServers)
- stopGemFireLocator()
- if (!IOUtils.waitForPortClose(InetAddress.getByName("localhost"), getLocatorPort, 30000))
- throw new IOException(s"Failed to stop GemFire locator at port $getLocatorPort.")
- println(s"Successfully stop GemFire locator at port $getLocatorPort.")
- }
-
- private def stopGemFireLocator() {
- println(s"=== GemFireRunner: stop locator")
- val p = new ProcessBuilder()
- .inheritIO()
- .command(gfshCmd, "stop", "locator", s"--dir=$testroot/locator")
- .start()
- p.waitFor()
- }
-
- private def stopGemFireServers(numServers: Int) {
- val procs = for (i <-1 to numServers) yield {
- println(s"=== GemFireRunner: stop server $i.")
- new ProcessBuilder()
- .inheritIO()
- .command(gfshCmd, "stop", "server", s"--dir=$testroot/server$i")
- .start()
- }
- procs.foreach(p => p.waitFor())
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/IOUtils.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/IOUtils.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/IOUtils.scala
deleted file mode 100644
index 28134a8..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/IOUtils.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.gemfire.spark.connector.testkit
-
-import java.io.{File, IOException}
-import java.net.{InetAddress, Socket}
-import com.gemstone.gemfire.internal.AvailablePort
-import scala.util.Try
-import org.apache.log4j.PropertyConfigurator
-import java.util.Properties
-
-object IOUtils {
-
- /** Makes a new directory or throws an `IOException` if it cannot be made */
- def mkdir(dir: File): File = {
- if (!dir.mkdirs())
- throw new IOException(s"Could not create dir $dir")
- dir
- }
-
- private def socketPortProb(host: InetAddress, port: Int) = Iterator.continually {
- Try {
- Thread.sleep(100)
- new Socket(host, port).close()
- }
- }
-
- /**
- * Waits until a port at the given address is open or timeout passes.
- * @return true if managed to connect to the port, false if timeout happened first
- */
- def waitForPortOpen(host: InetAddress, port: Int, timeout: Long): Boolean = {
- val startTime = System.currentTimeMillis()
- socketPortProb(host, port)
- .dropWhile(p => p.isFailure && System.currentTimeMillis() - startTime < timeout)
- .next()
- .isSuccess
- }
-
- /**
- * Waits until a port at the given address is close or timeout passes.
- * @return true if host:port is un-connect-able, false if timeout happened first
- */
- def waitForPortClose(host: InetAddress, port: Int, timeout: Long): Boolean = {
- val startTime = System.currentTimeMillis()
- socketPortProb(host, port)
- .dropWhile(p => p.isSuccess && System.currentTimeMillis() - startTime < timeout)
- .next()
- .isFailure
- }
-
- /**
- * Returns array of unique randomly available tcp ports of specified count.
- */
- def getRandomAvailableTCPPorts(count: Int): Seq[Int] =
- (0 until count).map(x => AvailablePort.getRandomAvailablePortKeeper(AvailablePort.SOCKET))
- .map{x => x.release(); x.getPort}.toArray
-
- /**
- * config a log4j properties used for integration tests
- */
- def configTestLog4j(level: String, props: (String, String)*): Unit = {
- val pro = new Properties()
- props.foreach(p => pro.put(p._1, p._2))
- configTestLog4j(level, pro)
- }
-
- def configTestLog4j(level: String, props: Properties): Unit = {
- val pro = new Properties()
- pro.put("log4j.rootLogger", s"$level, console")
- pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
- pro.put("log4j.appender.console.target", "System.err")
- pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
- pro.put("log4j.appender.console.layout.ConversionPattern",
- "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
- pro.putAll(props)
- PropertyConfigurator.configure(pro)
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
deleted file mode 100644
index 67f9e57..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming
-
-import org.apache.spark.util.ManualClock
-
-object ManualClockHelper {
-
- def addToTime(ssc: StreamingContext, timeToAdd: Long): Unit = {
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- clock.advance(timeToAdd)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
deleted file mode 100644
index fce1e67..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.reflect.ClassTag
-
-class TestInputDStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
- extends InputDStream[T](ssc_) {
-
- def start() {}
-
- def stop() {}
-
- def compute(validTime: Time): Option[RDD[T]] = {
- logInfo("Computing RDD for time " + validTime)
- val index = ((validTime - zeroTime) / slideDuration - 1).toInt
- val selectedInput = if (index < input.size) input(index) else Seq[T]()
-
- // lets us test cases where RDDs are not created
- if (selectedInput == null)
- return None
-
- val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
- logInfo("Created RDD " + rdd.id + " with " + selectedInput)
- Some(rdd)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
deleted file mode 100644
index 527b462..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.javaapi;
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import io.pivotal.gemfire.spark.connector.streaming.GemFireDStreamFunctions;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import java.util.Properties;
-
-import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream}
- * to provide GemFire Spark Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
- */
-public class GemFireJavaDStreamFunctions<T> {
-
- public final GemFireDStreamFunctions<T> dsf;
-
- public GemFireJavaDStreamFunctions(JavaDStream<T> ds) {
- this.dsf = new GemFireDStreamFunctions<T>(ds.dstream());
- }
-
- /**
- * Save the JavaDStream to GemFire key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param opConf the optional parameters for this operation
- */
- public <K, V> void saveToGemfire(
- String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf, Properties opConf) {
- dsf.saveToGemfire(regionPath, func, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaDStream to GemFire key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- * @param opConf the optional parameters for this operation
- */
- public <K, V> void saveToGemfire(
- String regionPath, PairFunction<T, K, V> func, Properties opConf) {
- dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaDStream to GemFire key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- */
- public <K, V> void saveToGemfire(
- String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) {
- dsf.saveToGemfire(regionPath, func, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the JavaDStream to GemFire key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
- */
- public <K, V> void saveToGemfire(
- String regionPath, PairFunction<T, K, V> func) {
- dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
deleted file mode 100644
index 6556462..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.javaapi;
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import io.pivotal.gemfire.spark.connector.streaming.GemFirePairDStreamFunctions;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import java.util.Properties;
-
-import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream}
- * to provide GemFire Spark Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
- */
-public class GemFireJavaPairDStreamFunctions<K, V> {
-
- public final GemFirePairDStreamFunctions<K, V> dsf;
-
- public GemFireJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) {
- this.dsf = new GemFirePairDStreamFunctions<K, V>(ds.dstream());
- }
-
- /**
- * Save the JavaPairDStream to GemFire key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param opConf the optional parameters for this operation
- */
- public void saveToGemfire(String regionPath, GemFireConnectionConf connConf, Properties opConf) {
- dsf.saveToGemfire(regionPath, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaPairDStream to GemFire key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- */
- public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) {
- dsf.saveToGemfire(regionPath, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the JavaPairDStream to GemFire key-value store.
- * @param regionPath the full path of region that the DStream is stored
- * @param opConf the optional parameters for this operation
- */
- public void saveToGemfire(String regionPath, Properties opConf) {
- dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the JavaPairDStream to GemFire key-value store.
- * @param regionPath the full path of region that the DStream is stored
- */
- public void saveToGemfire(String regionPath) {
- dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
deleted file mode 100644
index 72fa7a9..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.javaapi;
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import io.pivotal.gemfire.spark.connector.GemFirePairRDDFunctions;
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD;
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import scala.Option;
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-
-import java.util.Properties;
-
-import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide GemFire Spark
- * Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
- */
-public class GemFireJavaPairRDDFunctions<K, V> {
-
- public final GemFirePairRDDFunctions<K, V> rddf;
-
- public GemFireJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) {
- this.rddf = new GemFirePairRDDFunctions<K, V>(rdd.rdd());
- }
-
- /**
- * Save the pair RDD to GemFire key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param opConf the parameters for this operation
- */
- public void saveToGemfire(String regionPath, GemFireConnectionConf connConf, Properties opConf) {
- rddf.saveToGemfire(regionPath, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the pair RDD to GemFire key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param opConf the parameters for this operation
- */
- public void saveToGemfire(String regionPath, Properties opConf) {
- rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the pair RDD to GemFire key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- */
- public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) {
- rddf.saveToGemfire(regionPath, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the pair RDD to GemFire key-value store with the default GemFireConnector.
- * @param regionPath the full path of region that the RDD is stored
- */
- public void saveToGemfire(String regionPath) {
- rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap());
- }
-
- /**
- * Return an JavaPairRDD containing all pairs of elements with matching keys in
- * this RDD<K, V> and the GemFire `Region<K, V2>`. Each pair of elements
- * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
- * (k, v2) is in the GemFire region.
- *
- * @param regionPath the region path of the GemFire region
- * @param <V2> the value type of the GemFire region
- * @return JavaPairRDD<<K, V>, V2>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(String regionPath) {
- return joinGemfireRegion(regionPath, rddf.defaultConnectionConf());
- }
-
- /**
- * Return an JavaPairRDD containing all pairs of elements with matching keys in
- * this RDD<K, V> and the GemFire `Region<K, V2>`. Each pair of elements
- * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
- * (k, v2) is in the GemFire region.
- *
- * @param regionPath the region path of the GemFire region
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param <V2> the value type of the GemFire region
- * @return JavaPairRDD<<K, V>, V2>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(
- String regionPath, GemFireConnectionConf connConf) {
- GemFireJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGemfireRegion(regionPath, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<V2> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<K, V> and the GemFire `Region<K2, V2>`. The join key from RDD
- * element is generated by `func(K, V) => K2`, and the key from the GemFire
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
- * where (k, v) is in this RDD and (k2, v2) is in the GemFire region.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element (K, V)
- * @param <K2> the key type of the GemFire region
- * @param <V2> the value type of the GemFire region
- * @return JavaPairRDD<Tuple2<K, V>, V2>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func) {
- return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<K, V> and the GemFire `Region<K2, V2>`. The join key from RDD
- * element is generated by `func(K, V) => K2`, and the key from the GemFire
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
- * where (k, v) is in this RDD and (k2, v2) is in the GemFire region.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param <K2> the key type of the GemFire region
- * @param <V2> the value type of the GemFire region
- * @return JavaPairRDD<Tuple2<K, V>, V2>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func, GemFireConnectionConf connConf) {
- GemFireJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGemfireRegion(regionPath, func, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<V2> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K, V2>`.
- * For each element (k, v) in this RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
- * ((k, v), None)) if no element in the GemFire region have key k.
- *
- * @param regionPath the region path of the GemFire region
- * @param <V2> the value type of the GemFire region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(String regionPath) {
- return outerJoinGemfireRegion(regionPath, rddf.defaultConnectionConf());
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K, V2>`.
- * For each element (k, v) in this RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
- * ((k, v), None)) if no element in the GemFire region have key k.
- *
- * @param regionPath the region path of the GemFire region
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param <V2> the value type of the GemFire region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(
- String regionPath, GemFireConnectionConf connConf) {
- GemFireOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGemfireRegion(regionPath, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<Option<V2>> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K2, V2>`.
- * The join key from RDD element is generated by `func(K, V) => K2`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
- * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element (K, V)
- * @param <K2> the key type of the GemFire region
- * @param <V2> the value type of the GemFire region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func) {
- return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K2, V2>`.
- * The join key from RDD element is generated by `func(K, V) => K2`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
- * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param <K2> the key type of the GemFire region
- * @param <V2> the value type of the GemFire region
- * @return JavaPairRDD<Tuple2<K, V>, Option<V>>
- */
- public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(
- String regionPath, Function<Tuple2<K, V>, K2> func, GemFireConnectionConf connConf) {
- GemFireOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf);
- ClassTag<Tuple2<K, V>> kt = fakeClassTag();
- ClassTag<Option<V2>> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
deleted file mode 100644
index 519ba6e..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.javaapi;
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import io.pivotal.gemfire.spark.connector.GemFireRDDFunctions;
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD;
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Option;
-import scala.reflect.ClassTag;
-
-import java.util.Properties;
-
-import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide GemFire Spark
- * Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
- */
-public class GemFireJavaRDDFunctions<T> {
-
- public final GemFireRDDFunctions<T> rddf;
-
- public GemFireJavaRDDFunctions(JavaRDD<T> rdd) {
- this.rddf = new GemFireRDDFunctions<T>(rdd.rdd());
- }
-
- /**
- * Save the non-pair RDD to GemFire key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param opConf the parameters for this operation
- */
- public <K, V> void saveToGemfire(
- String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf, Properties opConf) {
- rddf.saveToGemfire(regionPath, func, connConf, propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the non-pair RDD to GemFire key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- */
- public <K, V> void saveToGemfire(
- String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) {
- rddf.saveToGemfire(regionPath, func, connConf, emptyStrStrMap());
- }
-
- /**
- * Save the non-pair RDD to GemFire key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- * @param opConf the parameters for this operation
- */
- public <K, V> void saveToGemfire(
- String regionPath, PairFunction<T, K, V> func, Properties opConf) {
- rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
- }
-
- /**
- * Save the non-pair RDD to GemFire key-value store with default GemFireConnector.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
- */
- public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func) {
- rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap());
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<T> and the GemFire `Region<K, V>`. The join key from RDD
- * element is generated by `func(T) => K`, and the key from the GemFire
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
- * where t is from this RDD and v is from the GemFire region.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element T
- * @param <K> the key type of the GemFire region
- * @param <V> the value type of the GemFire region
- * @return JavaPairRDD<T, V>
- */
- public <K, V> JavaPairRDD<T, V> joinGemfireRegion(String regionPath, Function<T, K> func) {
- return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in this
- * RDD<T> and the GemFire `Region<K, V>`. The join key from RDD
- * element is generated by `func(T) => K`, and the key from the GemFire
- * region is just the key of the key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
- * where t is from this RDD and v is from the GemFire region.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element T
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param <K> the key type of the GemFire region
- * @param <V> the value type of the GemFire region
- * @return JavaPairRDD<T, V>
- */
- public <K, V> JavaPairRDD<T, V> joinGemfireRegion(
- String regionPath, Function<T, K> func, GemFireConnectionConf connConf) {
- GemFireJoinRDD<T, K, V> rdd = rddf.joinGemfireRegion(regionPath, func, connConf);
- ClassTag<T> kt = fakeClassTag();
- ClassTag<V> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
- /**
- * Perform a left outer join of this RDD<T> and the GemFire `Region<K, V>`.
- * The join key from RDD element is generated by `func(T) => K`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (t) in this RDD, the resulting RDD will either contain
- * all pairs (t, Some(v)) for v in the GemFire region, or the pair
- * (t, None) if no element in the GemFire region have key `func(t)`.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element T
- * @param <K> the key type of the GemFire region
- * @param <V> the value type of the GemFire region
- * @return JavaPairRDD<T, Option<V>>
- */
- public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(String regionPath, Function<T, K> func) {
- return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
- }
-
- /**
- * Perform a left outer join of this RDD<T> and the GemFire `Region<K, V>`.
- * The join key from RDD element is generated by `func(T) => K`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (t) in this RDD, the resulting RDD will either contain
- * all pairs (t, Some(v)) for v in the GemFire region, or the pair
- * (t, None) if no element in the GemFire region have key `func(t)`.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element T
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param <K> the key type of the GemFire region
- * @param <V> the value type of the GemFire region
- * @return JavaPairRDD<T, Option<V>>
- */
- public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(
- String regionPath, Function<T, K> func, GemFireConnectionConf connConf) {
- GemFireOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf);
- ClassTag<T> kt = fakeClassTag();
- ClassTag<Option<V>> vt = fakeClassTag();
- return new JavaPairRDD<>(rdd, kt, vt);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java
deleted file mode 100644
index 980c409..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.javaapi;
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import io.pivotal.gemfire.spark.connector.GemFireSQLContextFunctions;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-
-/**
- * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide GemFire
- * OQL functionality.
- *
- * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
- */
-public class GemFireJavaSQLContextFunctions {
-
- public final GemFireSQLContextFunctions scf;
-
- public GemFireJavaSQLContextFunctions(SQLContext sqlContext) {
- scf = new GemFireSQLContextFunctions(sqlContext);
- }
-
- public <T> DataFrame gemfireOQL(String query) {
- DataFrame df = scf.gemfireOQL(query, scf.defaultConnectionConf());
- return df;
- }
-
- public <T> DataFrame gemfireOQL(String query, GemFireConnectionConf connConf) {
- DataFrame df = scf.gemfireOQL(query, connConf);
- return df;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java
deleted file mode 100644
index f8b930c..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.javaapi;
-
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD;
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD$;
-import org.apache.spark.SparkContext;
-import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
-
-import scala.reflect.ClassTag;
-import java.util.Properties;
-
-/**
- * Java API wrapper over {@link org.apache.spark.SparkContext} to provide GemFire
- * Connector functionality.
- *
- * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
- */
-public class GemFireJavaSparkContextFunctions {
-
- public final SparkContext sc;
-
- public GemFireJavaSparkContextFunctions(SparkContext sc) {
- this.sc = sc;
- }
-
- /**
- * Expose a GemFire region as a JavaPairRDD
- * @param regionPath the full path of the region
- * @param connConf the GemFireConnectionConf that can be used to access the region
- * @param opConf the parameters for this operation, such as preferred partitioner.
- */
- public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(
- String regionPath, GemFireConnectionConf connConf, Properties opConf) {
- ClassTag<K> kt = fakeClassTag();
- ClassTag<V> vt = fakeClassTag();
- GemFireRegionRDD<K, V> rdd = GemFireRegionRDD$.MODULE$.apply(
- sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt);
- return new GemFireJavaRegionRDD<>(rdd);
- }
-
- /**
- * Expose a GemFire region as a JavaPairRDD with default GemFireConnector and no preferred partitioner.
- * @param regionPath the full path of the region
- */
- public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath) {
- GemFireConnectionConf connConf = GemFireConnectionConf.apply(sc.getConf());
- return gemfireRegion(regionPath, connConf, new Properties());
- }
-
- /**
- * Expose a GemFire region as a JavaPairRDD with no preferred partitioner.
- * @param regionPath the full path of the region
- * @param connConf the GemFireConnectionConf that can be used to access the region
- */
- public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath, GemFireConnectionConf connConf) {
- return gemfireRegion(regionPath, connConf, new Properties());
- }
-
- /**
- * Expose a GemFire region as a JavaPairRDD with default GemFireConnector.
- * @param regionPath the full path of the region
- * @param opConf the parameters for this operation, such as preferred partitioner.
- */
- public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath, Properties opConf) {
- GemFireConnectionConf connConf = GemFireConnectionConf.apply(sc.getConf());
- return gemfireRegion(regionPath, connConf, opConf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
deleted file mode 100644
index 679f197..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.javaapi;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import scala.Tuple2;
-
-import io.pivotal.gemfire.spark.connector.package$;
-
-/**
- * The main entry point to Spark GemFire Connector Java API.
- *
- * There are several helpful static factory methods which build useful wrappers
- * around Spark Context, Streaming Context and RDD. There are also helper methods
- * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>.
- */
-public final class GemFireJavaUtil {
-
- /** constants */
- public static String GemFireLocatorPropKey = package$.MODULE$.GemFireLocatorPropKey();
- // partitioner related keys and values
- public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey();
- public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey();
- public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName();
- public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName();
- public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey();
- public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault();
-
- /** The private constructor is used prevents user from creating instance of this class. */
- private GemFireJavaUtil() { }
-
- /**
- * A static factory method to create a {@link GemFireJavaSparkContextFunctions} based
- * on an existing {@link SparkContext} instance.
- */
- public static GemFireJavaSparkContextFunctions javaFunctions(SparkContext sc) {
- return new GemFireJavaSparkContextFunctions(sc);
- }
-
- /**
- * A static factory method to create a {@link GemFireJavaSparkContextFunctions} based
- * on an existing {@link JavaSparkContext} instance.
- */
- public static GemFireJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) {
- return new GemFireJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc));
- }
-
- /**
- * A static factory method to create a {@link GemFireJavaPairRDDFunctions} based on an
- * existing {@link org.apache.spark.api.java.JavaPairRDD} instance.
- */
- public static <K, V> GemFireJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) {
- return new GemFireJavaPairRDDFunctions<K, V>(rdd);
- }
-
- /**
- * A static factory method to create a {@link GemFireJavaRDDFunctions} based on an
- * existing {@link org.apache.spark.api.java.JavaRDD} instance.
- */
- public static <T> GemFireJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) {
- return new GemFireJavaRDDFunctions<T>(rdd);
- }
-
- /**
- * A static factory method to create a {@link GemFireJavaPairDStreamFunctions} based on an
- * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance.
- */
- public static <K, V> GemFireJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) {
- return new GemFireJavaPairDStreamFunctions<>(ds);
- }
-
- /**
- * A static factory method to create a {@link GemFireJavaDStreamFunctions} based on an
- * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance.
- */
- public static <T> GemFireJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) {
- return new GemFireJavaDStreamFunctions<>(ds);
- }
-
- /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}<<Tuple2<K, V>>
- * to a {@link org.apache.spark.api.java.JavaPairRDD}<K, V>.
- */
- public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) {
- return JavaAPIHelper.toJavaPairRDD(rdd);
- }
-
- /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}<<Tuple2<K, V>>
- * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}<K, V>.
- */
- public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) {
- return JavaAPIHelper.toJavaPairDStream(ds);
- }
-
- /**
- * A static factory method to create a {@link GemFireJavaSQLContextFunctions} based
- * on an existing {@link SQLContext} instance.
- */
- public static GemFireJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) {
- return new GemFireJavaSQLContextFunctions(sqlContext);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala
deleted file mode 100644
index 39ec1c1..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import com.gemstone.gemfire.cache.execute.ResultCollector
-import com.gemstone.gemfire.cache.query.Query
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition
-
-
-trait GemFireConnection {
-
- /**
- * Validate region existence and key/value type constraints, throw RuntimeException
- * if region does not exist or key and/or value type do(es) not match.
- * @param regionPath the full path of region
- */
- def validateRegion[K, V](regionPath: String): Unit
-
- /**
- * Get Region proxy for the given region
- * @param regionPath the full path of region
- */
- def getRegionProxy[K, V](regionPath: String): Region[K, V]
-
- /**
- * Retrieve region meta data for the given region.
- * @param regionPath: the full path of the region
- * @return Some[RegionMetadata] if region exists, None otherwise
- */
- def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata]
-
- /**
- * Retrieve region data for the given region and bucket set
- * @param regionPath: the full path of the region
- * @param whereClause: the set of bucket IDs
- * @param split: GemFire RDD Partition instance
- */
- def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GemFireRDDPartition): Iterator[(K, V)]
-
- def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object
- /**
- * Create a gemfire OQL query
- * @param queryString GemFire OQL query string
- */
- def getQuery(queryString: String): Query
-
- /** Close the connection */
- def close(): Unit
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala
deleted file mode 100644
index ea6d246..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import org.apache.spark.SparkConf
-import io.pivotal.gemfire.spark.connector.internal.{DefaultGemFireConnectionManager, LocatorHelper}
-
-/**
- * Stores configuration of a connection to GemFire cluster. It is serializable and can
- * be safely sent over network.
- *
- * @param locators GemFire locator host:port pairs, the default is (localhost,10334)
- * @param gemfireProps The initial gemfire properties to be used.
- * @param connectionManager GemFireConnectionFactory instance
- */
-class GemFireConnectionConf(
- val locators: Seq[(String, Int)],
- val gemfireProps: Map[String, String] = Map.empty,
- connectionManager: GemFireConnectionManager = new DefaultGemFireConnectionManager
- ) extends Serializable {
-
- /** require at least 1 pair of (host,port) */
- require(locators.nonEmpty)
-
- def getConnection: GemFireConnection = connectionManager.getConnection(this)
-
-}
-
-object GemFireConnectionConf {
-
- /**
- * create GemFireConnectionConf object based on locator string and optional GemFireConnectionFactory
- * @param locatorStr GemFire cluster locator string
- * @param connectionManager GemFireConnection factory
- */
- def apply(locatorStr: String, gemfireProps: Map[String, String] = Map.empty)
- (implicit connectionManager: GemFireConnectionManager = new DefaultGemFireConnectionManager): GemFireConnectionConf = {
- new GemFireConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), gemfireProps, connectionManager)
- }
-
- /**
- * create GemFireConnectionConf object based on SparkConf. Note that implicit can
- * be used to control what GemFireConnectionFactory instance to use if desired
- * @param conf a SparkConf instance
- */
- def apply(conf: SparkConf): GemFireConnectionConf = {
- val locatorStr = conf.getOption(GemFireLocatorPropKey).getOrElse(
- throw new RuntimeException(s"SparkConf does not contain property $GemFireLocatorPropKey"))
- // SparkConf only holds properties whose key starts with "spark.", In order to
- // put gemfire properties in SparkConf, all gemfire properties are prefixes with
- // "spark.gemfire.". This prefix was removed before the properties were put in `gemfireProp`
- val prefix = "spark.gemfire."
- val gemfireProps = conf.getAll.filter {
- case (k, v) => k.startsWith(prefix) && k != GemFireLocatorPropKey
- }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap
- apply(locatorStr, gemfireProps)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala
deleted file mode 100644
index 7386f5c..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-/**
- * GemFireConnectionFactory provide an common interface that manages GemFire
- * connections, and it's serializable. Each factory instance will handle
- * connection instance creation and connection pool management.
- */
-trait GemFireConnectionManager extends Serializable {
-
- /** get connection for the given connector */
- def getConnection(connConf: GemFireConnectionConf): GemFireConnection
-
- /** close the connection */
- def closeConnection(connConf: GemFireConnectionConf): Unit
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala
deleted file mode 100644
index 96a7e81..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import java.io.File
-import java.net.URL
-import org.apache.commons.httpclient.methods.PostMethod
-import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity}
-import org.apache.commons.httpclient.HttpClient
-import org.apache.spark.Logging
-
-object GemFireFunctionDeployer {
- def main(args: Array[String]) {
- new GemFireFunctionDeployer(new HttpClient()).commandLineRun(args)
- }
-}
-
-class GemFireFunctionDeployer(val httpClient:HttpClient) extends Logging {
-
- def deploy(host: String, port: Int, jarLocation: String): String =
- deploy(host + ":" + port, jarLocation)
-
- def deploy(host: String, port: Int, jar:File): String =
- deploy(host + ":" + port, jar)
-
- def deploy(jmxHostAndPort: String, jarLocation: String): String =
- deploy(jmxHostAndPort, jarFileHandle(jarLocation))
-
- def deploy(jmxHostAndPort: String, jar: File): String = {
- val urlString = constructURLString(jmxHostAndPort)
- val filePost: PostMethod = new PostMethod(urlString)
- val parts: Array[Part] = new Array[Part](1)
- parts(0) = new FilePart("resources", jar)
- filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams))
- val status: Int = httpClient.executeMethod(filePost)
- "Deployed Jar with status:" + status
- }
-
- private[connector] def constructURLString(jmxHostAndPort: String) =
- "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
-
- private[connector]def jarFileHandle(jarLocation: String) = {
- val f: File = new File(jarLocation)
- if (!f.exists()) {
- val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath
- logInfo(errorMessage)
- throw new RuntimeException(errorMessage)
- }
- f
- }
-
- def commandLineRun(args: Array[String]):Unit = {
- val (hostPort: String, jarFile: String) =
- if (args.length < 2) {
- logInfo("JMX Manager Host and Port (example: localhost:7070):")
- val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in))
- val jmxHostAndPort = bufferedReader.readLine()
- logInfo("Location of gemfire-functions.jar:")
- val functionJarLocation = bufferedReader.readLine()
- (jmxHostAndPort, functionJarLocation)
- } else {
- (args(0), args(1))
- }
- val status = deploy(hostPort, jarFile)
- logInfo(status)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala
deleted file mode 100644
index 196d991..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import com.esotericsoftware.kryo.Kryo
-import io.pivotal.gemfire.spark.connector.internal.oql.UndefinedSerializer
-import org.apache.spark.serializer.KryoRegistrator
-import com.gemstone.gemfire.cache.query.internal.Undefined
-
-class GemFireKryoRegistrator extends KryoRegistrator{
-
- override def registerClasses(kyro: Kryo): Unit = {
- kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer])
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
deleted file mode 100644
index 63583da..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFirePairRDDWriter}
-import org.apache.spark.Logging
-import org.apache.spark.api.java.function.Function
-import org.apache.spark.rdd.RDD
-
-/**
- * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion.
- * Import `io.pivotal.gemfire.spark.connector._` at the top of your program to
- * use these functions.
- */
-class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging {
-
- /**
- * Save the RDD of pairs to GemFire key-value store without any conversion
- * @param regionPath the full path of region that the RDD is stored
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param opConf the optional parameters for this operation
- */
- def saveToGemfire(
- regionPath: String,
- connConf: GemFireConnectionConf = defaultConnectionConf,
- opConf: Map[String, String] = Map.empty): Unit = {
- connConf.getConnection.validateRegion[K, V](regionPath)
- if (log.isDebugEnabled)
- logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""")
- else
- logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
- val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf)
- rdd.sparkContext.runJob(rdd, writer.write _)
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this`
- * RDD and the GemFire `Region[K, V2]`. Each pair of elements will be returned
- * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the
- * GemFire region.
- *
- *@param regionPath the region path of the GemFire region
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @tparam K2 the key type of the GemFire region
- * @tparam V2 the value type of the GemFire region
- * @return RDD[T, V]
- */
- def joinGemfireRegion[K2 <: K, V2](
- regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K, V2] = {
- new GemFireJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this` RDD
- * and the GemFire `Region[K2, V2]`. The join key from RDD element is generated by
- * `func(K, V) => K2`, and the key from the GemFire region is jus the key of the
- * key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
- * where (k, v) is in `this` RDD and (k2, v2) is in the GemFire region.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @tparam K2 the key type of the GemFire region
- * @tparam V2 the value type of the GemFire region
- * @return RDD[(K, V), V2]
- */
- def joinGemfireRegion[K2, V2](
- regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K2, V2] =
- new GemFireJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
-
- /** This version of joinGemfireRegion(...) is just for Java API. */
- private[connector] def joinGemfireRegion[K2, V2](
- regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireJoinRDD[(K, V), K2, V2] = {
- new GemFireJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
- }
-
- /**
- * Perform a left outer join of `this` RDD and the GemFire `Region[K, V2]`.
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
- * ((k, v), None)) if no element in the GemFire region have key k.
- *
- * @param regionPath the region path of the GemFire region
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @tparam K2 the key type of the GemFire region
- * @tparam V2 the value type of the GemFire region
- * @return RDD[ (K, V), Option[V] ]
- */
- def outerJoinGemfireRegion[K2 <: K, V2](
- regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K, V2] = {
- new GemFireOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
- }
-
- /**
- * Perform a left outer join of `this` RDD and the GemFire `Region[K2, V2]`.
- * The join key from RDD element is generated by `func(K, V) => K2`, and the
- * key from region is jus the key of the key/value pair.
- *
- * For each element (k, v) in `this` RDD, the resulting RDD will either contain
- * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
- * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`.
- *
- *@param regionPath the region path of the GemFire region
- * @param func the function that generates region key from RDD element (K, V)
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @tparam K2 the key type of the GemFire region
- * @tparam V2 the value type of the GemFire region
- * @return RDD[ (K, V), Option[V] ]
- */
- def outerJoinGemfireRegion[K2, V2](
- regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = {
- new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
- }
-
- /** This version of outerJoinGemfireRegion(...) is just for Java API. */
- private[connector] def outerJoinGemfireRegion[K2, V2](
- regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = {
- new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
- }
-
- private[connector] def defaultConnectionConf: GemFireConnectionConf =
- GemFireConnectionConf(rdd.sparkContext.getConf)
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
deleted file mode 100644
index 2039b7f..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFireRDDWriter}
-import org.apache.spark.Logging
-import org.apache.spark.api.java.function.{PairFunction, Function}
-import org.apache.spark.rdd.RDD
-
-/**
- * Extra gemFire functions on non-Pair RDDs through an implicit conversion.
- * Import `io.pivotal.gemfire.spark.connector._` at the top of your program to
- * use these functions.
- */
-class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging {
-
- /**
- * Save the non-pair RDD to GemFire key-value store.
- * @param regionPath the full path of region that the RDD is stored
- * @param func the function that converts elements of RDD to key/value pairs
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @param opConf the optional parameters for this operation
- */
- def saveToGemfire[K, V](
- regionPath: String,
- func: T => (K, V),
- connConf: GemFireConnectionConf = defaultConnectionConf,
- opConf: Map[String, String] = Map.empty): Unit = {
- connConf.getConnection.validateRegion[K, V](regionPath)
- if (log.isDebugEnabled)
- logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""")
- else
- logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
- val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf)
- rdd.sparkContext.runJob(rdd, writer.write(func) _)
- }
-
- /** This version of saveToGemfire(...) is just for Java API. */
- private[connector] def saveToGemfire[K, V](
- regionPath: String,
- func: PairFunction[T, K, V],
- connConf: GemFireConnectionConf,
- opConf: Map[String, String]): Unit = {
- saveToGemfire[K, V](regionPath, func.call _, connConf, opConf)
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this` RDD
- * and the GemFire `Region[K, V]`. The join key from RDD element is generated by
- * `func(T) => K`, and the key from the GemFire region is just the key of the
- * key/value pair.
- *
- * Each pair of elements of result RDD will be returned as a (t, v) tuple,
- * where (t) is in `this` RDD and (k, v) is in the GemFire region.
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generate region key from RDD element T
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @tparam K the key type of the GemFire region
- * @tparam V the value type of the GemFire region
- * @return RDD[T, V]
- */
- def joinGemfireRegion[K, V](regionPath: String, func: T => K,
- connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[T, K, V] = {
- new GemFireJoinRDD[T, K, V](rdd, func, regionPath, connConf)
- }
-
- /** This version of joinGemfireRegion(...) is just for Java API. */
- private[connector] def joinGemfireRegion[K, V](
- regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireJoinRDD[T, K, V] = {
- joinGemfireRegion(regionPath, func.call _, connConf)
- }
-
- /**
- * Perform a left outer join of `this` RDD and the GemFire `Region[K, V]`.
- * The join key from RDD element is generated by `func(T) => K`, and the
- * key from region is just the key of the key/value pair.
- *
- * For each element (t) in `this` RDD, the resulting RDD will either contain
- * all pairs (t, Some(v)) for v in the GemFire region, or the pair
- * (t, None) if no element in the GemFire region have key `func(t)`
- *
- * @param regionPath the region path of the GemFire region
- * @param func the function that generate region key from RDD element T
- * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
- * @tparam K the key type of the GemFire region
- * @tparam V the value type of the GemFire region
- * @return RDD[ T, Option[V] ]
- */
- def outerJoinGemfireRegion[K, V](regionPath: String, func: T => K,
- connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[T, K, V] = {
- new GemFireOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf)
- }
-
- /** This version of outerJoinGemfireRegion(...) is just for Java API. */
- private[connector] def outerJoinGemfireRegion[K, V](
- regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[T, K, V] = {
- outerJoinGemfireRegion(regionPath, func.call _, connConf)
- }
-
- private[connector] def defaultConnectionConf: GemFireConnectionConf =
- GemFireConnectionConf(rdd.sparkContext.getConf)
-
-}
-
-