You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2019/02/26 12:13:16 UTC
[datafu] 02/02: Add ScalaPython bridge and joinWithRangeAndDedup
This is an automated email from the ASF dual-hosted git repository.
eyal pushed a commit to branch spark-tmp
in repository https://gitbox.apache.org/repos/asf/datafu.git
commit 36dfef565bd0de8521e95dfc60b820de14fbc36f
Author: Ohad Raviv <or...@paypal.com>
AuthorDate: Tue Feb 26 14:08:49 2019 +0200
Add ScalaPython bridge and joinWithRangeAndDedup
Signed-off-by: Eyal Allweil <ey...@apache.org>
---
.gitignore | 2 +-
datafu-spark/build.gradle | 2 +-
.../META-INF/services/datafu.spark.PythonResource | 1 +
.../src/main/resources/pyspark_utils/__init__.py | 0
.../main/resources/pyspark_utils/bridge_utils.py | 55 ++++++++
.../src/main/resources/pyspark_utils/df_utils.py | 144 +++++++++++++++++++
.../resources/pyspark_utils/init_spark_context.py | 5 +
.../src/main/scala/datafu/spark/DataFrameOps.scala | 4 +
.../scala/datafu/spark/PythonPathsManager.scala | 147 ++++++++++++++++++++
.../scala/datafu/spark/ScalaPythonBridge.scala | 152 +++++++++++++++++++++
.../src/main/scala/datafu/spark/SparkDFUtils.scala | 141 +++++++++++++------
.../spark/utils/overwrites/SparkPythonRunner.scala | 140 +++++++++++++++++++
.../META-INF/services/datafu.spark.PythonResource | 3 +
.../built_in_pyspark_lib/py4j-0.10.6-src.zip | Bin 0 -> 80352 bytes
.../resources/built_in_pyspark_lib/pyspark.zip | Bin 0 -> 541536 bytes
.../src/test/resources/example_tests/__init__.py | 0
.../test/resources/example_tests/df_utils_tests.py | 71 ++++++++++
.../test/resources/example_tests/pyfromscala.py | 74 ++++++++++
.../example_tests/pyfromscala_with_error.py | 3 +
datafu-spark/src/test/resources/log4j.properties | 5 +
datafu-spark/src/test/resources/text.csv | 5 +
.../datafu/spark/PySparkLibTestResources.scala | 28 ++++
.../scala/datafu/spark/TestScalaPythonBridge.scala | 111 +++++++++++++++
.../test/scala/datafu/spark/TestSparkDFUtils.scala | 36 ++++-
gradle.properties | 3 +-
25 files changed, 1084 insertions(+), 48 deletions(-)
diff --git a/.gitignore b/.gitignore
index 96aeaa0..9a2f99a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,4 +35,4 @@ gradlew*
datafu-scala-spark/out
datafu-scala-spark/derby.log
datafu-spark/spark-warehouse
-datafu-spark/metastore_db
+datafu-spark/metastore_db
\ No newline at end of file
diff --git a/datafu-spark/build.gradle b/datafu-spark/build.gradle
index 3a9d181..867ca42 100644
--- a/datafu-spark/build.gradle
+++ b/datafu-spark/build.gradle
@@ -37,7 +37,7 @@ allprojects {
}
}
-archivesBaseName = 'datafu-spark_' + scalaVersion
+archivesBaseName = 'datafu-spark_' + scalaVersion + '_' + sparkVersion
import groovy.xml.MarkupBuilder
diff --git a/datafu-spark/src/main/resources/META-INF/services/datafu.spark.PythonResource b/datafu-spark/src/main/resources/META-INF/services/datafu.spark.PythonResource
new file mode 100644
index 0000000..a029545
--- /dev/null
+++ b/datafu-spark/src/main/resources/META-INF/services/datafu.spark.PythonResource
@@ -0,0 +1 @@
+datafu.spark.CoreBridgeDirectory
\ No newline at end of file
diff --git a/datafu-spark/src/main/resources/pyspark_utils/__init__.py b/datafu-spark/src/main/resources/pyspark_utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
new file mode 100644
index 0000000..69927d3
--- /dev/null
+++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
@@ -0,0 +1,55 @@
+import os
+
+from py4j.java_gateway import JavaGateway, GatewayClient
+from pyspark.conf import SparkConf
+from pyspark.context import SparkContext
+from pyspark.sql import SparkSession
+
+# use jvm gateway to create a java class instance by full-qualified class name
+def _getjvm_class(gateway, fullClassName):
+ return gateway.jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(fullClassName).newInstance()
+
+
+class Context(object):
+
+ def __init__(self):
+ from py4j.java_gateway import java_import
+ """When running a Python script from Scala - this function is called
+ by the script to initialize the connection to the Java Gateway and get the spark context.
+ code is basically copied from:
+ https://github.com/apache/zeppelin/blob/master/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py#L30
+ """
+
+ if os.environ.get("SPARK_EXECUTOR_URI"):
+ SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
+
+ gateway = JavaGateway(GatewayClient(port=int(os.environ.get("PYSPARK_GATEWAY_PORT"))), auto_convert=True)
+ java_import(gateway.jvm, "org.apache.spark.SparkEnv")
+ java_import(gateway.jvm, "org.apache.spark.SparkConf")
+ java_import(gateway.jvm, "org.apache.spark.api.java.*")
+ java_import(gateway.jvm, "org.apache.spark.api.python.*")
+ java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
+ java_import(gateway.jvm, "org.apache.spark.sql.*")
+ java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
+
+ intp = gateway.entry_point
+
+ jSparkSession = intp.pyGetSparkSession()
+ jsc = intp.pyGetJSparkContext(jSparkSession)
+ jconf = intp.pyGetSparkConf(jsc)
+ conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
+ self.sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
+
+ # Spark 2
+ self.sparkSession = SparkSession(self.sc, jSparkSession)
+ self.sqlContext = self.sparkSession._wrapped
+
+ctx = None
+
+
+def get_contexts():
+ global ctx
+ if not ctx:
+ ctx = Context()
+
+ return ctx.sc, ctx.sqlContext, ctx.sparkSession
diff --git a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
new file mode 100644
index 0000000..746ae44
--- /dev/null
+++ b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
@@ -0,0 +1,144 @@
+from pyspark.sql import DataFrame
+
+from pyspark_utils.bridge_utils import _getjvm_class
+
+
+class PySparkDFUtils(object):
+
+ def __init__(self):
+ self._sc = None
+
+ def _initSparkContext(self, sc, sqlContext):
+ self._sc = sc
+ self._sqlContext = sqlContext
+ self._gateway = sc._gateway
+
+ def _get_jvm_spark_utils(self):
+ jvm_utils = _getjvm_class(self._gateway, "datafu.spark.SparkDFUtilsBridge")
+ return jvm_utils
+
+ # public:
+
+ def dedup(self, dataFrame, groupCol, orderCols = []):
+ """
+ Used get the 'latest' record (after ordering according to the provided order columns) in each group.
+ :param dataFrame: DataFrame to operate on
+ :param groupCol: column to group by the records
+ :param orderCols: columns to order the records according to.
+ :return: DataFrame representing the data after the operation
+ """
+ self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
+ java_cols = self._cols_to_java_cols(orderCols)
+ jdf = self._get_jvm_spark_utils().dedup(dataFrame._jdf, groupCol._jc, java_cols)
+ return DataFrame(jdf, self._sqlContext)
+
+ def dedupTopN(self, dataFrame, n, groupCol, orderCols = []):
+ """
+ Used get the top N records (after ordering according to the provided order columns) in each group.
+ :param dataFrame: DataFrame to operate on
+ :param n: number of records to return from each group
+ :param groupCol: column to group by the records
+ :param orderCols: columns to order the records according to
+ :return: DataFrame representing the data after the operation
+ """
+ self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
+ java_cols = self._cols_to_java_cols(orderCols)
+ jdf = self._get_jvm_spark_utils().dedupTopN(dataFrame._jdf, n, groupCol._jc, java_cols)
+ return DataFrame(jdf, self._sqlContext)
+
+ def dedup2(self, dataFrame, groupCol, orderByCol, desc = True, columnsFilter = [], columnsFilterKeep = True):
+ """
+ Used get the 'latest' record (after ordering according to the provided order columns) in each group.
+ :param dataFrame: DataFrame to operate on
+ :param groupCol: column to group by the records
+ :param orderByCol: column to order the records according to
+ :param desc: have the order as desc
+ :param columnsFilter: columns to filter
+ :param columnsFilterKeep: indicates whether we should filter the selected columns 'out' or alternatively have only
+ * those columns in the result
+ :return: DataFrame representing the data after the operation
+ """
+ self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
+ jdf = self._get_jvm_spark_utils().dedup2(dataFrame._jdf, groupCol._jc, orderByCol._jc, desc, columnsFilter, columnsFilterKeep)
+ return DataFrame(jdf, self._sqlContext)
+
+ def changeSchema(self, dataFrame, newScheme = []):
+ """
+ Returns a DataFrame with the column names renamed to the column names in the new schema
+ :param dataFrame: DataFrame to operate on
+ :param newScheme: new column names
+ :return: DataFrame representing the data after the operation
+ """
+ self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
+ jdf = self._get_jvm_spark_utils().changeSchema(dataFrame._jdf, newScheme)
+ return DataFrame(jdf, self._sqlContext)
+
+ def joinSkewed(self, dfLeft, dfRight, joinExprs, numShards = 30, joinType= "inner"):
+ """
+ Used to perform a join when the right df is relatively small but doesn't fit to perform broadcast join.
+ Use cases:
+ a. excluding keys that might be skew from a medium size list.
+ b. join a big skewed table with a table that has small number of very big rows.
+ :param dfLeft: left DataFrame
+ :param dfRight: right DataFrame
+ :param joinExprs: join expression
+ :param numShards: number of shards
+ :param joinType: join type
+ :return: DataFrame representing the data after the operation
+ """
+ self._initSparkContext(dfLeft._sc, dfLeft.sql_ctx)
+ utils = self._get_jvm_spark_utils()
+ jdf = utils.joinSkewed(dfLeft._jdf, dfRight._jdf, joinExprs._jc, numShards, joinType)
+ return DataFrame(jdf, self._sqlContext)
+
+ def broadcastJoinSkewed(self, notSkewed, skewed, joinCol, numberCustsToBroadcast):
+ """
+ Suitable to perform a join in cases when one DF is skewed and the other is not skewed.
+ splits both of the DFs to two parts according to the skewed keys.
+ 1. Map-join: broadcasts the skewed-keys part of the not skewed DF to the skewed-keys part of the skewed DF
+ 2. Regular join: between the remaining two parts.
+ :param notSkewed: not skewed DataFrame
+ :param skewed: skewed DataFrame
+ :param joinCol: join column
+ :param numberCustsToBroadcast: number of custs to broadcast
+ :return: DataFrame representing the data after the operation
+ """
+ self._initSparkContext(skewed._sc, skewed.sql_ctx)
+ jdf = self._get_jvm_spark_utils().broadcastJoinSkewed(notSkewed._jdf, skewed._jdf, joinCol, numberCustsToBroadcast)
+ return DataFrame(jdf, self._sqlContext)
+
+ def joinWithRange(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, decreaseFactor):
+ """
+ Helper function to join a table with column to a table with range of the same column.
+ For example, ip table with whois data that has range of ips as lines.
+ The main problem which this handles is doing naive explode on the range can result in huge table.
+ requires:
+ 1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range.
+ 2. the range and single columns to be numeric.
+ """
+ self._initSparkContext(dfSingle._sc, dfSingle.sql_ctx)
+ jdf = self._get_jvm_spark_utils().joinWithRange(dfSingle._jdf, colSingle, dfRange._jdf, colRangeStart, colRangeEnd, decreaseFactor)
+ return DataFrame(jdf, self._sqlContext)
+
+ def joinWithRangeAndDedup(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, decreaseFactor, dedupSmallRange):
+ """
+ Helper function to join a table with column to a table with range of the same column.
+ For example, ip table with whois data that has range of ips as lines.
+ The main problem which this handles is doing naive explode on the range can result in huge table.
+ requires:
+ 1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range.
+ 2. the range and single columns to be numeric.
+ """
+ self._initSparkContext(dfSingle._sc, dfSingle.sql_ctx)
+ jdf = self._get_jvm_spark_utils().joinWithRangeAndDedup(dfSingle._jdf, colSingle, dfRange._jdf, colRangeStart, colRangeEnd, decreaseFactor, dedupSmallRange)
+ return DataFrame(jdf, self._sqlContext)
+
+ def _cols_to_java_cols(self, cols):
+ return self._map_if_needed(lambda x: x._jc, cols)
+
+ def _dfs_to_java_dfs(self, dfs):
+ return self._map_if_needed(lambda x: x._jdf, dfs)
+
+ def _map_if_needed(self, func, itr):
+ return map(func, itr) if itr is not None else itr
+
diff --git a/datafu-spark/src/main/resources/pyspark_utils/init_spark_context.py b/datafu-spark/src/main/resources/pyspark_utils/init_spark_context.py
new file mode 100644
index 0000000..d879d03
--- /dev/null
+++ b/datafu-spark/src/main/resources/pyspark_utils/init_spark_context.py
@@ -0,0 +1,5 @@
+
+from pyspark_utils.bridge_utils import get_contexts
+sc, sqlContext, spark = get_contexts()
+
+print "initiated contexts"
diff --git a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
index 6f5ae18..f1ac607 100644
--- a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
@@ -40,6 +40,10 @@ object DataFrameOps {
def joinWithRange(colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long = 2^8) =
SparkDFUtils.joinWithRange(df, colSingle, dfRange, colRangeStart, colRangeEnd, DECREASE_FACTOR)
+ def joinWithRangeAndDedup(colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long = 2^8, dedupSmallRange: Boolean = true): DataFrame = {
+ SparkDFUtils.joinWithRangeAndDedup(df, colSingle, dfRange, colRangeStart, colRangeEnd, DECREASE_FACTOR, dedupSmallRange)
+ }
+
def broadcastJoinSkewed(skewed: DataFrame, joinCol: String, numberCustsToBroadcast: Int): DataFrame = {
SparkDFUtils.broadcastJoinSkewed(df, skewed, joinCol, numberCustsToBroadcast)
}
diff --git a/datafu-spark/src/main/scala/datafu/spark/PythonPathsManager.scala b/datafu-spark/src/main/scala/datafu/spark/PythonPathsManager.scala
new file mode 100644
index 0000000..b451846
--- /dev/null
+++ b/datafu-spark/src/main/scala/datafu/spark/PythonPathsManager.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.spark
+
+import java.io.{File, IOException}
+import java.net.JarURLConnection
+import java.nio.file.Paths
+import java.util.{MissingResourceException, ServiceLoader}
+
+import org.apache.log4j.Logger
+
+import scala.collection.JavaConverters._
+
+/**
+ * Represents a resource that needs to be added to PYTHONPATH used by ScalaPythonBridge.
+ *
+ * To ensure your python resources (modules, files, etc.) are properly added to the bridge, do the following:
+ * 1) Put all the resource under some root directory with a unique name x, and make sure path/to/x
+ * is visible to the class loader (usually just use src/main/resources/x).
+ * 2) Extend this class like this:
+ * class MyResource extends PythonResource("x")
+ * This assumes x is under src/main/resources/x
+ * 3) (since we use ServiceLoader) Add a file to your jar/project:
+ * META-INF/services/spark.utils.PythonResource
+ * with a single line containing the full name (including package) of MyResource.
+ *
+ * This process involves scanning the entire jar and copying files from the jar to some temporary location,
+ * so if your jar is really big consider putting the resources in a smaller jar.
+ *
+ * @param resourcePath Path to the resource, will be loaded via getClass.getClassLoader.getResource()
+ * @param isAbsolutePath Set to true if the resource is in some absolute path rather than in jar (try to avoid that).
+ */
+abstract class PythonResource(val resourcePath: String, val isAbsolutePath: Boolean = false)
+
+/**
+ * There are two phases of resolving python files path:
+ *
+ * 1) When launching spark:
+ * the files need to be added to spark.executorEnv.PYTHONPATH
+ * on cluster mode this is handled by com.paypal.risk.ars.bigdata.execution_fw.hadoop.spark.SparkTask
+ * and on test mode by com.paypal.risk.ars.bigdata.hadoop.spark.utils.SparkTestManager
+ *
+ * 2) When executing python file via bridge:
+ * the files need to be added to the process PYTHONPATH. This is different than the previous phase because
+ * this python process is spawn by zonkey, not by spark, and always on the driver.
+ */
+object PythonPathsManager {
+
+ case class ResolvedResource(resource: PythonResource, resolvedLocation: String)
+
+ private val logger: Logger = Logger.getLogger(getClass)
+
+ val resources: Seq[ResolvedResource] =
+ ServiceLoader.load(classOf[PythonResource])
+ .asScala
+ .map(p => ResolvedResource(p, resolveDependencyLocation(p)))
+ .toSeq
+
+ logResolved
+
+ def getAbsolutePaths() = resources.map(_.resolvedLocation).distinct
+ def getAbsolutePathsForJava() = resources.map(_.resolvedLocation).distinct.asJava
+
+ def getPYTHONPATH(): String =
+ resources
+ .map(_.resolvedLocation)
+ .map(p => new File(p))
+ .map(_.getName) //get just the name of the file
+ .mkString(":")
+
+ private def resolveDependencyLocation(resource: PythonResource): String =
+ if (resource.isAbsolutePath) {
+ if (!new File(resource.resourcePath).exists())
+ throw new IOException("Could not find resource in absolute path: " + resource.resourcePath)
+ else {
+ logger.info("Using file absolute path: " + resource.resourcePath)
+ resource.resourcePath
+ }
+ } else {
+ Option(getClass.getClassLoader.getResource(resource.resourcePath)) match {
+ case None =>
+ logger.error("Didn't find resource in classpath! resource path: " + resource.resourcePath)
+ throw new MissingResourceException("Didn't find resource in classpath!", resource.getClass.getName, resource.resourcePath)
+ case Some(p) =>
+ p.toURI.getScheme match {
+ case "jar" =>
+ //if dependency is inside jar file, use jar file path:
+ val jarPath = new File(p.openConnection().asInstanceOf[JarURLConnection].getJarFileURL.toURI).getPath
+ logger.info(s"Dependency ${resource.resourcePath} found inside jar: " + jarPath)
+ jarPath
+ case "file" =>
+ val file = new File(p.getFile)
+ if (!file.exists()) {
+ logger.warn("Dependency not found, skipping: " + file.getPath)
+ null
+ }
+ else {
+ if (file.isDirectory) {
+ val t_path =
+ if (System.getProperty("os.name").toLowerCase().contains("win")&& p.getPath().startsWith("/")) {
+ val path = p.getPath.substring(1)
+ logger.warn(s"Fixing path for windows operating system! converted ${p.getPath} to $path")
+ path
+ }
+ else
+ p.getPath
+ val path = Paths.get(t_path)
+ logger.info(s"Dependency found as directory: ${t_path}\n\tusing parent path: ${path.getParent}")
+ path.getParent.toString
+ } else {
+ logger.info("Dependency found as a file: " + p.getPath)
+ p.getPath
+ }
+ }
+ }
+ }
+ }
+
+ private def logResolved = {
+ logger.info(
+ s"Discovered ${resources.size} python paths:\n" +
+ resources
+ .map(p => s"className: ${p.resource.getClass.getName}\n\tresource: ${p.resource.resourcePath}\n\tlocation: ${p.resolvedLocation}")
+ .mkString("\n")) + "\n\n"
+ }
+}
+
+/**
+ * Contains all python files needed by the bridge itself
+ */
+class CoreBridgeDirectory extends PythonResource("pyspark_utils")
diff --git a/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
new file mode 100644
index 0000000..8b9cf82
--- /dev/null
+++ b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.spark
+
+import java.io._
+import java.net.URL
+import java.nio.file.Files
+import java.util.UUID
+
+import org.apache.spark.SparkConf
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.deploy.SparkPythonRunner
+import org.apache.spark.sql.SparkSession
+import org.slf4j.LoggerFactory
+
+
+case class ScalaPythonBridgeRunner(extraPath: String = "") {
+
+ val logger = LoggerFactory.getLogger(this.getClass)
+ //for the bridge we take the full resolved location, since this runs on the driver where the files are local:
+ logger.info("constructing PYTHONPATH")
+ val pythonPath = (PythonPathsManager.getAbsolutePaths() ++
+ Array("pyspark.zip", "py4j-0.10.6-src.zip") ++
+ Option(extraPath).getOrElse("").split(",")).distinct
+
+ logger.info("Bridge PYTHONPATH: " + pythonPath.mkString(":"))
+
+ val runner = SparkPythonRunner(pythonPath.mkString(","))
+
+ def runPythonFile(filename: String): String = {
+ val pyScript = resolveRunnableScript(filename)
+ // this.getClass.getClassLoader.asInstanceOf[URLClassLoader].getURLs.foreach(s => logger.info("class: " + s))
+ logger.info(s"Running python file $pyScript")
+ runner.runPyFile(pyScript)
+ }
+
+ def runPythonString(str: String): String = {
+ val tmpFile = writeToTempFile(str, "pyspark-tmp-file-", ".py")
+ logger.info("Running tmp PySpark file: " + tmpFile.getAbsolutePath + " with content:\n" + str)
+ runner.runPyFile(tmpFile.getAbsolutePath)
+ }
+
+ private def resolveRunnableScript(path: String): String = {
+ logger.info("Resolving python script location for: " + path)
+
+ val res: String = Option(this.getClass.getClassLoader.getResource(path)) match {
+ case None =>
+ logger.info("Didn't find script via classLoader, using as is: " + path)
+ path
+ case Some(resource) =>
+ resource.toURI.getScheme match {
+ case "jar" =>
+ //if inside jar, extract it and return cloned file:
+ logger.info("Script found inside jar, extracting...")
+ val outputFile = ResourceCloning.cloneResource(resource, path)
+ logger.info("Extracted file path: " + outputFile.getPath)
+ outputFile.getPath
+ case _ =>
+ logger.info("Using script original path: " + resource.getPath)
+ resource.getPath
+ }
+ }
+ res
+ }
+
+ private def writeToTempFile(contents: String, prefix: String, suffix: String): File = {
+ val tempFi = File.createTempFile(prefix, suffix)
+ tempFi.deleteOnExit()
+ val bw = new BufferedWriter(new FileWriter(tempFi))
+ bw.write(contents)
+ bw.close()
+ tempFi
+ }
+
+}
+
+/**
+ * Do not instantiate this class! Use the companion object instead.
+ * This class should only be used by python
+ */
+object ScalaPythonBridge { //need empty ctor for py4j gateway
+
+ /** members used to allow python script share context with main Scala program calling it.
+ * Python script calls :
+ * sc, sqlContext, spark = utils.get_contexts()
+ * our Python util function get_contexts
+ * uses the following to create Python wrappers around Java SparkContext and SQLContext.
+ */
+ // Called by python util get_contexts()
+ def pyGetSparkSession(): SparkSession = SparkSession.builder().getOrCreate()
+ def pyGetJSparkContext(sparkSession: SparkSession): JavaSparkContext = new JavaSparkContext(sparkSession.sparkContext)
+ def pyGetSparkConf(jsc: JavaSparkContext): SparkConf = jsc.getConf
+
+}
+
+/**
+ * Utility for extracting resource from a jar and copy it to a temporary location
+ */
+object ResourceCloning {
+
+ private val logger = LoggerFactory.getLogger(this.getClass)
+
+ val uuid = UUID.randomUUID().toString.substring(6)
+ val outputTempDir = new File(System.getProperty("java.io.tmpdir"), s"risk_tmp/$uuid/cloned_resources/")
+ forceMkdirs(outputTempDir)
+
+ def cloneResource(resource: URL, outputFileName: String): File = {
+ val outputTmpFile = new File(outputTempDir, outputFileName)
+ if (outputTmpFile.exists()) {
+ logger.info(s"resource $outputFileName already exists, skipping..")
+ outputTmpFile
+ } else {
+ logger.info("cloning resource: " + resource)
+ if (!outputTmpFile.exists()) { //it is possible that the file was already extracted in the session
+ forceMkdirs(outputTmpFile.getParentFile)
+ val inputStream = resource.openStream()
+ streamToFile(outputTmpFile, inputStream)
+ }
+ outputTmpFile
+ }
+ }
+
+ private def forceMkdirs(dir: File) =
+ if (!dir.exists() && !dir.mkdirs())
+ throw new IOException("Failed to create " + dir.getPath)
+
+ private def streamToFile(outputFile: File, inputStream: InputStream) = {
+ try {
+ Files.copy(inputStream, outputFile.toPath)
+ }
+ finally {
+ inputStream.close()
+ assert(outputFile.exists())
+ }
+ }
+}
\ No newline at end of file
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index f22a13f..61386bd 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -20,16 +20,59 @@ package datafu.spark
import java.util.{List => JavaList}
-import org.apache.log4j.Logger
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, SparkOverwriteUDAFs, StructType}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.SizeEstimator
-object SparkDFUtils extends SparkDFUtilsTrait {
+/**
+ * class definition so we could expose this functionality in PySpark
+ */
+class SparkDFUtilsBridge {
+
+ def dedup(df: DataFrame, groupCol: Column, orderCols: JavaList[Column]): DataFrame = {
+ val converted = convertJavaListToSeq(orderCols)
+ SparkDFUtils.dedup(df = df, groupCol = groupCol, orderCols = converted: _*)
+ }
+
+ def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: JavaList[Column]): DataFrame = {
+ val converted = convertJavaListToSeq(orderCols)
+ SparkDFUtils.dedupTopN(df = df, n = n, groupCol = groupCol, orderCols = converted: _*)
+ }
+
+ def dedup2(df: DataFrame, groupCol: Column, orderByCol: Column, desc: Boolean, columnsFilter: JavaList[String], columnsFilterKeep: Boolean): DataFrame = {
+ val columnsFilter_converted = convertJavaListToSeq(columnsFilter)
+ SparkDFUtils.dedup2(df = df, groupCol = groupCol, orderByCol = orderByCol, desc = desc, moreAggFunctions = Nil, columnsFilter = columnsFilter_converted, columnsFilterKeep = columnsFilterKeep)
+ }
+
+ def changeSchema(df: DataFrame, newScheme: JavaList[String]): DataFrame = {
+ val newScheme_converted = convertJavaListToSeq(newScheme)
+ SparkDFUtils.changeSchema(df = df, newScheme = newScheme_converted: _*)
+ }
+
+ def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int, joinType: String): DataFrame = {
+ SparkDFUtils.joinSkewed(dfLeft = dfLeft, dfRight = dfRight, joinExprs = joinExprs, numShards = numShards, joinType = joinType)
+ }
+
+ def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numRowsToBroadcast: Int): DataFrame = {
+ SparkDFUtils.broadcastJoinSkewed(notSkewed = notSkewed, skewed = skewed, joinCol = joinCol, numRowsToBroadcast = numRowsToBroadcast)
+ }
+
+ def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
+ SparkDFUtils.joinWithRange(dfSingle = dfSingle, colSingle = colSingle, dfRange = dfRange, colRangeStart = colRangeStart, colRangeEnd = colRangeEnd, DECREASE_FACTOR = DECREASE_FACTOR)
+ }
+
+ def joinWithRangeAndDedup(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long, dedupSmallRange: Boolean): DataFrame = {
+ SparkDFUtils.joinWithRangeAndDedup(dfSingle = dfSingle, colSingle = colSingle, dfRange = dfRange, colRangeStart = colRangeStart, colRangeEnd = colRangeEnd, DECREASE_FACTOR = DECREASE_FACTOR, dedupSmallRange = dedupSmallRange)
+ }
+
+ private def convertJavaListToSeq[T](list: JavaList[T]): Seq[T] = {
+ scala.collection.JavaConverters.asScalaIteratorConverter(list.iterator()).asScala.toList
+ }
+}
+
+object SparkDFUtils {
/**
* Used to get the 'latest' record (after ordering according to the provided order columns) in each group.
@@ -40,19 +83,21 @@ object SparkDFUtils extends SparkDFUtilsTrait {
* @param orderCols columns to order the records according to
* @return DataFrame representing the data after the operation
*/
- override def dedup(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame = {
+ def dedup(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame = {
+ df.dropDuplicates()
dedupTopN(df, 1, groupCol, orderCols: _*)
}
/**
* Used get the top N records (after ordering according to the provided order columns) in each group.
+ *
* @param df DataFrame to operate on
* @param n number of records to return from each group
* @param groupCol column to group by the records
* @param orderCols columns to order the records according to
* @return DataFrame representing the data after the operation
*/
- override def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: Column*): DataFrame = {
+ def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: Column*): DataFrame = {
val w = Window.partitionBy(groupCol).orderBy(orderCols: _*)
df.withColumn("rn", row_number.over(w)).where(col("rn") <= n).drop("rn")
}
@@ -71,7 +116,7 @@ object SparkDFUtils extends SparkDFUtilsTrait {
* those columns in the result
* @return DataFrame representing the data after the operation
*/
- override def dedup2(df: DataFrame, groupCol: Column, orderByCol: Column, desc: Boolean = true, moreAggFunctions: Seq[Column] = Nil, columnsFilter: Seq[String] = Nil, columnsFilterKeep: Boolean = true): DataFrame = {
+ def dedup2(df: DataFrame, groupCol: Column, orderByCol: Column, desc: Boolean = true, moreAggFunctions: Seq[Column] = Nil, columnsFilter: Seq[String] = Nil, columnsFilterKeep: Boolean = true): DataFrame = {
val newDF = if (columnsFilter == Nil)
df.withColumn("sort_by_column", orderByCol)
else {
@@ -81,7 +126,6 @@ object SparkDFUtils extends SparkDFUtilsTrait {
df.select(df.columns.filter(colName => !columnsFilter.contains(colName)).map(colName => new Column(colName)):_*).withColumn("sort_by_column", orderByCol)
}
-
val aggFunc = if (desc) SparkOverwriteUDAFs.maxValueByKey(_:Column, _:Column)
else SparkOverwriteUDAFs.minValueByKey(_:Column, _:Column)
@@ -115,7 +159,7 @@ object SparkDFUtils extends SparkDFUtilsTrait {
* @param colName column name for a column of type StructType
* @return DataFrame representing the data after the operation
*/
- override def flatten(df: DataFrame, colName: String): DataFrame = {
+ def flatten(df: DataFrame, colName: String): DataFrame = {
assert(df.schema(colName).dataType.isInstanceOf[StructType], s"Column $colName must be of type Struct")
val outerFields = df.schema.fields.map(_.name).toSet
val flattenFields = df.schema(colName).dataType.asInstanceOf[StructType].fields.filter(f => !outerFields.contains(f.name)).map("`" + colName + "`.`" + _.name + "`")
@@ -124,11 +168,12 @@ object SparkDFUtils extends SparkDFUtilsTrait {
/**
* Returns a DataFrame with the column names renamed to the column names in the new schema
+ *
* @param df DataFrame to operate on
* @param newScheme new column names
* @return DataFrame representing the data after the operation
*/
- override def changeSchema(df: DataFrame, newScheme: String*): DataFrame =
+ def changeSchema(df: DataFrame, newScheme: String*): DataFrame =
df.select(df.columns.zip(newScheme).map {case (oldCol: String, newCol: String) => col(oldCol).as(newCol)}: _*)
/**
@@ -144,7 +189,7 @@ object SparkDFUtils extends SparkDFUtilsTrait {
* @param joinType join type
* @return joined DataFrame
*/
- override def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int = 10, joinType: String = "inner"): DataFrame = {
+ def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int = 10, joinType: String = "inner"): DataFrame = {
// skew join based on salting
// salts the left DF by adding another random column and join with the right DF after duplicating it
val ss = dfLeft.sparkSession
@@ -158,13 +203,14 @@ object SparkDFUtils extends SparkDFUtilsTrait {
* splits both of the DFs to two parts according to the skewed keys.
* 1. Map-join: broadcasts the skewed-keys part of the not skewed DF to the skewed-keys part of the skewed DF
* 2. Regular join: between the remaining two parts.
+ *
* @param notSkewed not skewed DataFrame
* @param skewed skewed DataFrame
* @param joinCol join column
* @param numRowsToBroadcast num of rows to broadcast
* @return DataFrame representing the data after the operation
*/
- override def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numRowsToBroadcast: Int): DataFrame = {
+ def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numRowsToBroadcast: Int): DataFrame = {
val ss = notSkewed.sparkSession
import ss.implicits._
val skewedKeys = skewed.groupBy(joinCol).count().sort($"count".desc).limit(numRowsToBroadcast).drop("count")
@@ -212,11 +258,13 @@ object SparkDFUtils extends SparkDFUtilsTrait {
* +----------+---------+----------+
*
* OUTPUT:
- * +-------+----------+---------+-------+
- * |time |start_time|end_time |desc |
- * +-------+----------+---------+-------+
- * |11:55 |11:50 |12:15 | lunch |
- * +-------+----------+---------+-------+
+ * +-------+----------+---------+---------+
+ * |time |start_time|end_time |desc |
+ * +-------+----------+---------+---------+
+ * |11:55 |10:00 |12:00 | meeting |
+ * +-------+----------+---------+---------+
+ * |11:55 |11:50 |12:15 | lunch |
+ * +-------+----------+---------+---------+
*
* @param dfSingle - DataFrame that contains the point column
* @param colSingle - the point column's name
@@ -226,43 +274,58 @@ object SparkDFUtils extends SparkDFUtilsTrait {
* @param DECREASE_FACTOR - resolution factor. instead of exploding the range column directly, we first decrease its resolution by this factor
* @return
*/
- override def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
+ def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
+ val dfJoined = joinWithRangeInternal(dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, DECREASE_FACTOR)
+ dfJoined.drop("range_start", "range_end", "decreased_range_single", "single", "decreased_single", "range_size")
+ }
+
+ private def joinWithRangeInternal(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
import org.apache.spark.sql.functions.udf
val rangeUDF = udf((start: Long, end: Long) => (start to end).toArray)
val dfRange_exploded = dfRange.withColumn("range_start", col(colRangeStart).cast(LongType))
.withColumn("range_end" , col(colRangeEnd).cast(LongType))
.withColumn("decreased_range_single", explode(rangeUDF(col("range_start")/lit(DECREASE_FACTOR),
- col("range_end" )/lit(DECREASE_FACTOR))))
+ col("range_end") /lit(DECREASE_FACTOR))))
- val dfJoined =
dfSingle.withColumn("single", floor(col(colSingle).cast(LongType)))
.withColumn("decreased_single", floor(col(colSingle).cast(LongType)/lit(DECREASE_FACTOR)))
.join(dfRange_exploded, col("decreased_single") === col("decreased_range_single"), "left_outer")
.withColumn("range_size", expr("(range_end - range_start + 1)"))
.filter("single>=range_start and single<=range_end")
-
- dedup2(dfJoined, col(colSingle), col("range_size"), desc = false)
- .drop("range_start", "range_end", "decreased_range_single", "single", "decreased_single", "range_size")
-
}
-}
-
-
-trait SparkDFUtilsTrait {
- def dedup(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame
-
- def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: Column*): DataFrame
- def dedup2(df: DataFrame, groupCol: Column, orderByCol: Column, desc: Boolean, moreAggFunctions: Seq[Column], columnsFilter: Seq[String], columnsFilterKeep: Boolean): DataFrame
-
- def flatten(df: DataFrame, colName: String): DataFrame
-
- def changeSchema(df: DataFrame, newScheme: String*): DataFrame
+ /**
+ * Run joinWithRange and afterwards run dedup
+ *
+ * @param dedupSmallRange - by small/large range
+ *
+ * OUTPUT for dedupSmallRange = "true":
+ * +-------+----------+---------+---------+
+ * |time |start_time|end_time |desc |
+ * +-------+----------+---------+---------+
+ * |11:55 |11:50 |12:15 | lunch |
+ * +-------+----------+---------+---------+
+ *
+ * OUTPUT for dedupSmallRange = "false":
+ * +-------+----------+---------+---------+
+ * |time |start_time|end_time |desc |
+ * +-------+----------+---------+---------+
+ * |11:55 |10:00 |12:00 | meeting |
+ * +-------+----------+---------+---------+
+ *
+ */
+ def joinWithRangeAndDedup(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long, dedupSmallRange: Boolean): DataFrame = {
- def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int, joinType: String): DataFrame
+ val dfJoined = joinWithRangeInternal(dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, DECREASE_FACTOR)
- def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numberCustsToBroadcast: Int): DataFrame
+ // "range_start" is here for consistency
+ val dfDeduped = if (dedupSmallRange) {
+ dedup2(dfJoined, col(colSingle), struct("range_size", "range_start"), desc = false)
+ } else {
+ dedup2(dfJoined, col(colSingle), struct(expr("-range_size"), col("range_start")), desc = true)
+ }
- def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame
+ dfDeduped.drop("range_start", "range_end", "decreased_range_single", "single", "decreased_single", "range_size")
+ }
}
diff --git a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
new file mode 100644
index 0000000..3eed62c
--- /dev/null
+++ b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import java.io._
+
+import datafu.spark.ScalaPythonBridge
+import org.apache.log4j.Logger
+import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.util.Utils
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * We wrap Spark's PythonRunner because we failed on premature python process closing.
+ * in PythonRunner the python process exits immediately when finished to read the filename,
+ * this caused us to Accumulators Exceptions when the driver tries to get accumulation data
+ * from the python gateway.
+ * Instead, like in Zeppelin, we create an "interactive" python process, feed it the python
+ * script and not closing the gateway.
+ */
+case class SparkPythonRunner(pyPaths: String, otherArgs: Array[String] = Array()) {
+
+ val logger: Logger = Logger.getLogger(getClass)
+ val (reader, writer, process) = initPythonEnv()
+
+ def runPyFile(pythonFile: String): String = {
+
+ val formattedPythonFile = PythonRunner.formatPath(pythonFile)
+ execFile(formattedPythonFile, writer, reader)
+
+ }
+
+ private def initPythonEnv(): (BufferedReader, BufferedWriter, Process) = {
+
+ val pythonExec =
+ sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))
+
+ // Format python filename paths before adding them to the PYTHONPATH
+ val formattedPyFiles = PythonRunner.formatPaths(pyPaths)
+
+ // Launch a Py4J gateway server for the process to connect to; this will let it see our
+ // Java system properties and such
+ val gatewayServer = new py4j.GatewayServer(ScalaPythonBridge, 0)
+ val thread = new Thread(new Runnable() {
+ override def run(): Unit = Utils.logUncaughtExceptions {
+ gatewayServer.start()
+ }
+ })
+ thread.setName("py4j-gateway-init")
+ thread.setDaemon(true)
+ thread.start()
+
+ // Wait until the gateway server has started, so that we know which port is it bound to.
+ // `gatewayServer.start()` will start a new thread and run the server code there, after
+ // initializing the socket, so the thread started above will end as soon as the server is
+ // ready to serve connections.
+ thread.join()
+
+ // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
+ // python directories in SPARK_HOME (if set), and any files in the pyPaths argument
+ val pathElements = new ArrayBuffer[String]
+ pathElements ++= formattedPyFiles
+ pathElements += PythonUtils.sparkPythonPath
+ pathElements += sys.env.getOrElse("PYTHONPATH", "")
+ val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
+ logger.info(s"Running python with PYTHONPATH:\n\t${formattedPyFiles.mkString(",")}")
+
+ // Launch Python process
+ //val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
+ val builder = new ProcessBuilder((Seq(pythonExec, "-iu") ++ otherArgs).asJava)
+ val env = builder.environment()
+ env.put("PYTHONPATH", pythonPath)
+ // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
+ env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
+ env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
+ builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
+ //try {
+ val process = builder.start()
+ //new RedirectThread(process.getInputStream, System.out, "redirect output").start()
+ val writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream))
+ val reader = new BufferedReader(new InputStreamReader(process.getInputStream))
+
+ // val exitCode = process.waitFor()
+ // if (exitCode != 0) {
+ // throw new SparkUserAppException(exitCode)
+ // }
+ //} finally {
+ // gatewayServer.shutdown()
+ //}
+
+ (reader, writer, process)
+ }
+
+
+ private def execFile(filename: String, writer: BufferedWriter, reader: BufferedReader): String = {
+ writer.write("import traceback\n")
+ writer.write("try:\n")
+ writer.write(" execfile('"+filename+"')\n")
+ writer.write(" print (\"*!?flush reader!?*\")\n")
+ writer.write("except Exception as e:\n")
+ writer.write(" traceback.print_exc()\n")
+ writer.write(" print (\"*!?flush error reader!?*\")\n\n")
+// writer.write(" exit(1)\n\n")
+ writer.flush()
+ var output = ""
+ var line: String = reader.readLine
+ while (!line.contains("*!?flush reader!?*") && !line.contains("*!?flush error reader!?*")) {
+ //logger.debug("Read line from python shell : " + line)
+ System.out.println(line)
+ if (line == "...") {
+ //logger.warn("Syntax error ! ")
+ output += "Syntax error ! "
+ }
+ output += "\r" + line + "\n"
+ line = reader.readLine
+ }
+
+ if (line.contains("*!?flush error reader!?*"))
+ throw new RuntimeException("python bridge error: " + output)
+
+ output
+ }
+
+}
diff --git a/datafu-spark/src/test/resources/META-INF/services/datafu.spark.PythonResource b/datafu-spark/src/test/resources/META-INF/services/datafu.spark.PythonResource
new file mode 100644
index 0000000..1b6add8
--- /dev/null
+++ b/datafu-spark/src/test/resources/META-INF/services/datafu.spark.PythonResource
@@ -0,0 +1,3 @@
+datafu.spark.ExampleFiles
+datafu.spark.Py4JResource
+datafu.spark.PysparkResource
\ No newline at end of file
diff --git a/datafu-spark/src/test/resources/built_in_pyspark_lib/py4j-0.10.6-src.zip b/datafu-spark/src/test/resources/built_in_pyspark_lib/py4j-0.10.6-src.zip
new file mode 100644
index 0000000..2f8edcc
Binary files /dev/null and b/datafu-spark/src/test/resources/built_in_pyspark_lib/py4j-0.10.6-src.zip differ
diff --git a/datafu-spark/src/test/resources/built_in_pyspark_lib/pyspark.zip b/datafu-spark/src/test/resources/built_in_pyspark_lib/pyspark.zip
new file mode 100644
index 0000000..b6d1e63
Binary files /dev/null and b/datafu-spark/src/test/resources/built_in_pyspark_lib/pyspark.zip differ
diff --git a/datafu-spark/src/test/resources/example_tests/__init__.py b/datafu-spark/src/test/resources/example_tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/datafu-spark/src/test/resources/example_tests/df_utils_tests.py b/datafu-spark/src/test/resources/example_tests/df_utils_tests.py
new file mode 100644
index 0000000..304db02
--- /dev/null
+++ b/datafu-spark/src/test/resources/example_tests/df_utils_tests.py
@@ -0,0 +1,71 @@
+import os
+import sys
+from pprint import pprint as p
+
+from pyspark_utils.df_utils import PySparkDFUtils
+
+p('CHECKING IF PATHS EXISTS:')
+for x in sys.path:
+ p('PATH ' + x + ': ' + str(os.path.exists(x)))
+
+
+df_utils = PySparkDFUtils()
+
+df_people = sqlContext.createDataFrame([
+ ("a", "Alice", 34),
+ ("a", "Sara", 33),
+ ("b", "Bob", 36),
+ ("b", "Charlie", 30),
+ ("c", "David", 29),
+ ("c", "Esther", 32),
+ ("c", "Fanny", 36),
+ ("c", "Zoey", 36)],
+ ["id", "name", "age"])
+
+func_dedup_res = df_utils.dedup(dataFrame=df_people, groupCol=df_people.id,
+ orderCols=[df_people.age.desc(), df_people.name.desc()])
+func_dedup_res.registerTempTable("dedup")
+
+func_dedupTopN_res = df_utils.dedupTopN(dataFrame=df_people, n=2, groupCol=df_people.id,
+ orderCols=[df_people.age.desc(), df_people.name.desc()])
+func_dedupTopN_res.registerTempTable("dedupTopN")
+
+func_dedup2_res = df_utils.dedup2(dataFrame=df_people, groupCol=df_people.id, orderByCol=df_people.age, desc=True,
+ columnsFilter=["name"], columnsFilterKeep=False)
+func_dedup2_res.registerTempTable("dedup2")
+
+func_changeSchema_res = df_utils.changeSchema(dataFrame=df_people, newScheme=["id1", "name1", "age1"])
+func_changeSchema_res.registerTempTable("changeSchema")
+
+df_people2 = sqlContext.createDataFrame([
+ ("a", "Laura", 34),
+ ("a", "Stephani", 33),
+ ("b", "Margaret", 36)],
+ ["id", "name", "age"])
+
+simpleDF = sqlContext.createDataFrame([
+ ("a", "1")],
+ ["id", "value"])
+from pyspark.sql.functions import expr
+
+func_joinSkewed_res = df_utils.joinSkewed(dfLeft=df_people2.alias("df1"), dfRight=simpleDF.alias("df2"),
+ joinExprs=expr("df1.id == df2.id"), numShards=5,
+ joinType="inner")
+func_joinSkewed_res.registerTempTable("joinSkewed")
+
+func_broadcastJoinSkewed_res = df_utils.broadcastJoinSkewed(notSkewed=df_people2, skewed=simpleDF, joinCol="id",
+ numberCustsToBroadcast=5)
+func_broadcastJoinSkewed_res.registerTempTable("broadcastJoinSkewed")
+
+dfRange = sqlContext.createDataFrame([
+ ("a", 34, 36)],
+ ["id1", "start", "end"])
+func_joinWithRange_res = df_utils.joinWithRange(dfSingle=df_people2, colSingle="age", dfRange=dfRange,
+ colRangeStart="start", colRangeEnd="end",
+ decreaseFactor=5)
+func_joinWithRange_res.registerTempTable("joinWithRange")
+
+func_joinWithRangeAndDedup_res = df_utils.joinWithRangeAndDedup(dfSingle=df_people2, colSingle="age", dfRange=dfRange,
+ colRangeStart="start", colRangeEnd="end",
+ decreaseFactor=5, dedupSmallRange=True)
+func_joinWithRangeAndDedup_res.registerTempTable("joinWithRangeAndDedup")
diff --git a/datafu-spark/src/test/resources/example_tests/pyfromscala.py b/datafu-spark/src/test/resources/example_tests/pyfromscala.py
new file mode 100644
index 0000000..2ba99cd
--- /dev/null
+++ b/datafu-spark/src/test/resources/example_tests/pyfromscala.py
@@ -0,0 +1,74 @@
+
+# print the PYTHONPATH
+import sys
+from pprint import pprint as p
+p(sys.path)
+
+from pyspark.sql import functions as F
+
+
+import os
+print os.getcwd()
+
+
+###############################################################
+# query scala defined DF
+###############################################################
+dfout = sqlContext.sql("select num * 2 as d from dfin")
+dfout.registerTempTable("dfout")
+dfout.groupBy(dfout['d']).count().show()
+sqlContext.sql("select count(*) as cnt from dfout").show()
+dfout.groupBy(dfout['d']).agg(F.count(F.col('d')).alias('cnt')).show()
+
+sqlContext.sql("select d * 4 as d from dfout").registerTempTable("dfout2")
+
+
+###############################################################
+# check python UDFs
+###############################################################
+
+def magic_func(s):
+
+ return s + " magic"
+
+sqlContext.udf.register("magic", magic_func)
+
+
+###############################################################
+# check sc.textFile
+###############################################################
+
+DEL = '\x10'
+
+from pyspark.sql.types import StructType, StructField
+from pyspark.sql.types import StringType
+
+schema = StructType([
+ StructField("A", StringType()),
+ StructField("B", StringType())
+])
+
+txt_df = sqlContext.read.csv('src/test/resources/text.csv', sep=DEL, schema=schema)
+
+print type(txt_df)
+print dir(txt_df)
+print txt_df.count()
+
+txt_df.show()
+
+txt_df2 = sc.textFile('src/test/resources/text.csv').map(lambda x: x.split(DEL)).toDF()
+txt_df2.show()
+
+
+###############################################################
+# convert python dict to DataFrame
+###############################################################
+
+d = {'a': 0.1, 'b': 2}
+d = [(k,1.0*d[k]) for k in d]
+stats_df = sc.parallelize(d, 1).toDF(["name", "val"])
+stats_df.registerTempTable('stats')
+
+sqlContext.table("stats").show()
+
+
diff --git a/datafu-spark/src/test/resources/example_tests/pyfromscala_with_error.py b/datafu-spark/src/test/resources/example_tests/pyfromscala_with_error.py
new file mode 100644
index 0000000..a2ba4e5
--- /dev/null
+++ b/datafu-spark/src/test/resources/example_tests/pyfromscala_with_error.py
@@ -0,0 +1,3 @@
+
+
+sqlContext.sql("select * from edw.table_not_exists")
diff --git a/datafu-spark/src/test/resources/log4j.properties b/datafu-spark/src/test/resources/log4j.properties
new file mode 100644
index 0000000..7422151
--- /dev/null
+++ b/datafu-spark/src/test/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootCategory=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d %5p %c{3} - %m%n
diff --git a/datafu-spark/src/test/resources/text.csv b/datafu-spark/src/test/resources/text.csv
new file mode 100644
index 0000000..c0d981d
--- /dev/null
+++ b/datafu-spark/src/test/resources/text.csv
@@ -0,0 +1,5 @@
+14
+52
+47
+38
+03
\ No newline at end of file
diff --git a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
new file mode 100644
index 0000000..a68ee76
--- /dev/null
+++ b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.spark
+
+class PysparkResource extends PythonResource(PathsResolver.pyspark, true)
+
+class Py4JResource extends PythonResource(PathsResolver.py4j, true)
+
+object PathsResolver {
+ val pyspark = ResourceCloning.cloneResource(getClass.getResource("/built_in_pyspark_lib/pyspark.zip"), "pyspark_cloned.zip").getPath
+ val py4j = ResourceCloning.cloneResource(getClass.getResource("/built_in_pyspark_lib/py4j-0.10.6-src.zip"), "py4j_cloned.zip").getPath
+}
\ No newline at end of file
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
new file mode 100644
index 0000000..f3bea97
--- /dev/null
+++ b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.spark
+
+import java.io.File
+
+import com.holdenkarau.spark.testing.Utils
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.junit._
+import org.scalatest.FunSuite
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import scala.util.Try
+
+object TestScalaPythonBridge {
+
+ def getNewRunner(): ScalaPythonBridgeRunner = {
+ val runner = ScalaPythonBridgeRunner()
+ runner.runPythonFile("pyspark_utils/init_spark_context.py")
+ runner
+ }
+
+ def getNewSparkSession(): SparkSession = {
+
+ val tempDir = Utils.createTempDir()
+ val localMetastorePath = new File(tempDir, "metastore").getCanonicalPath
+ val localWarehousePath = new File(tempDir, "wharehouse").getCanonicalPath
+ val pythonPath = PythonPathsManager.getAbsolutePaths().mkString(File.pathSeparator)
+ println("Creating SparkConf with PYTHONPATH: " + pythonPath)
+ val sparkConf = new SparkConf()
+ .setMaster("local[1]")
+ .set("spark.sql.warehouse.dir", localWarehousePath)
+ .set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastorePath;create=true")
+ //.set("datanucleus.rdbms.datastoreAdapterClassName", "org.datanucleus.store.rdbms.adapter.DerbyAdapter")
+ .setExecutorEnv(Seq(("PYTHONPATH", pythonPath)))
+ .setAppName("Spark Unit Test")
+
+ val builder = SparkSession.builder().config(sparkConf).enableHiveSupport()
+ val spark = builder.getOrCreate()
+
+ spark
+ }
+}
+
+@RunWith(classOf[JUnitRunner])
+class TestScalaPythonBridge extends FunSuite {
+
+ private val spark = TestScalaPythonBridge.getNewSparkSession
+ private lazy val runner = TestScalaPythonBridge.getNewRunner()
+
+ def assertTable(tableName: String, expected: String): Unit =
+ Assert.assertEquals(expected, spark.table(tableName).collect().sortBy(_.toString).mkString(", "))
+
+
+ test("pyfromscala.py") {
+
+ import spark.implicits._
+
+ val dfin = spark.sparkContext.parallelize(1 to 10).toDF("num")
+ dfin.createOrReplaceTempView("dfin")
+
+ runner.runPythonFile("example_tests/pyfromscala.py")
+
+ // try to invoke python udf from scala code
+ assert(spark.sql("select magic('python_udf')").collect().mkString(",") == "[python_udf magic]")
+
+ //spark.sql("select * from dfout").registerTempTable("output")
+ assertTable("dfout", "[10], [12], [14], [16], [18], [20], [2], [4], [6], [8]")
+ assertTable("dfout2", "[16], [24], [32], [40], [48], [56], [64], [72], [80], [8]")
+ assertTable("stats", "[a,0.1], [b,2.0]")
+ }
+
+ test("pyfromscala_with_error.py") {
+ val t = Try(runner.runPythonFile("example_tests/pyfromscala_with_error.py"))
+ assert(t.isFailure)
+ assert(t.failed.get.isInstanceOf[RuntimeException])
+ }
+
+ test("SparkDFUtilsBridge") {
+ runner.runPythonFile("example_tests/df_utils_tests.py")
+ assertTable("dedup", "[a,Alice,34], [b,Bob,36], [c,Zoey,36]")
+ assertTable("dedupTopN", "[a,Alice,34], [a,Sara,33], [b,Bob,36], [b,Charlie,30], [c,Fanny,36], [c,Zoey,36]")
+ assertTable("dedup2", "[a,34], [b,36], [c,36]")
+ assertTable("changeSchema", "[a,Alice,34], [a,Sara,33], [b,Bob,36], [b,Charlie,30], [c,David,29], [c,Esther,32], [c,Fanny,36], [c,Zoey,36]")
+ assertTable("joinSkewed", "[a,Laura,34,a,1], [a,Stephani,33,a,1]")
+ assertTable("broadcastJoinSkewed", "[a,Laura,34,1], [a,Stephani,33,1]")
+ assertTable("joinWithRange", "[a,Laura,34,a,34,36], [b,Margaret,36,a,34,36]")
+ assertTable("joinWithRangeAndDedup", "[a,Laura,34,a,34,36], [b,Margaret,36,a,34,36]")
+ }
+
+}
+
+class ExampleFiles extends PythonResource("example_tests")
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
index 861a1c0..d6fa0f6 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -126,23 +126,47 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
assertDataFrameEquals(expected, actual)
}
- case class expRangeJoin(col_grp:String, col_ord:Option[Int], col_str:String, start:Option[Int], end:Option[Int], desc:String)
-
+ case class expRangeJoin2(col_grp:String, col_ord:Option[Int], col_str:String, start:Option[Int], end:Option[Int], desc:String)
+ case class expRangeJoin1(col_grp:String, col_ord:Int, col_str:String, start:Option[Int], end:Option[Int], desc:String)
+
test("join_with_range") {
val df = sc.parallelize(List(("a", 1, "asd1"), ("a", 2, "asd2"), ("a", 3, "asd3"), ("b", 1, "asd4"))).toDF("col_grp", "col_ord", "col_str")
val dfr = sc.parallelize(List((1, 2,"asd1"), (1, 4, "asd2"), (3, 5,"asd3"), (3, 10,"asd4"))).toDF("start", "end", "desc")
val expected = sqlContext.createDataFrame(List(
- expRangeJoin("b",Option(1),"asd4",Option(1),Option(2),"asd1"),
- expRangeJoin("a",Option(3),"asd3",Option(3),Option(5),"asd3"),
- expRangeJoin("a",Option(2),"asd2",Option(1),Option(2),"asd1")
-
+ expRangeJoin1("a",1,"asd1",Option(1),Option(2),"asd1"),
+ expRangeJoin1("a",1,"asd1",Option(1),Option(4),"asd2"),
+ expRangeJoin1("a",2,"asd2",Option(1),Option(2),"asd1"),
+ expRangeJoin1("a",2,"asd2",Option(1),Option(4),"asd2"),
+ expRangeJoin1("a",3,"asd3",Option(1),Option(4),"asd2"),
+ expRangeJoin1("a",3,"asd3",Option(3),Option(5),"asd3"),
+ expRangeJoin1("a",3,"asd3",Option(3),Option(10),"asd4"),
+ expRangeJoin1("b",1,"asd4",Option(1),Option(2),"asd1"),
+ expRangeJoin1("b",1,"asd4",Option(1),Option(4),"asd2")
))
val actual = df.joinWithRange("col_ord", dfr, "start", "end")
+ actual.show()
assertDataFrameEquals(expected, actual)
}
+
+ test("join_with_range_and_dedup") {
+ val df = sc.parallelize(List(("a", 1, "asd1"), ("a", 2, "asd2"), ("a", 3, "asd3"), ("b", 1, "asd4"))).toDF("col_grp", "col_ord", "col_str")
+ val dfr = sc.parallelize(List((1, 2,"asd1"), (1, 4, "asd2"), (3, 5,"asd3"), (3, 10,"asd4"))).toDF("start", "end", "desc")
+
+ val expected = sqlContext.createDataFrame(List(
+ expRangeJoin2("b",Option(1),"asd4",Option(1),Option(2),"asd1"),
+ expRangeJoin2("a",Option(3),"asd3",Option(3),Option(5),"asd3"),
+ expRangeJoin2("a",Option(2),"asd2",Option(1),Option(2),"asd1")
+
+ ))
+
+ val actual = df.joinWithRangeAndDedup("col_ord", dfr, "start", "end")
+ actual.show()
+
+ assertDataFrameEquals(expected, actual)
+ }
test("broadcastJoinSkewed") {
val skewedList = List(("1", "a"), ("1", "b"), ("1", "c"), ("1", "d"), ("1", "e"),("2", "k"),("0", "k"))
diff --git a/gradle.properties b/gradle.properties
index 10ce0cf..05d1749 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,8 +16,9 @@
# under the License.
group=org.apache.datafu
-version=1.4.0
+version=1.5.0
gradleVersion=4.8.1
org.gradle.jvmargs="-XX:MaxPermSize=512m"
scalaVersion=2.11
+sparkVersion=2.3.0
release=false