You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2019/06/03 15:10:08 UTC

[datafu] branch spark-tmp updated: More review changes

This is an automated email from the ASF dual-hosted git repository.

eyal pushed a commit to branch spark-tmp
in repository https://gitbox.apache.org/repos/asf/datafu.git


The following commit(s) were added to refs/heads/spark-tmp by this push:
     new afa8e72  More review changes
afa8e72 is described below

commit afa8e72fac8c157b263c56cab1c651f142a4dd6b
Author: oraviv <or...@paypal.com>
AuthorDate: Mon Jun 3 17:14:38 2019 +0300

    More review changes
    
    Signed-off-by: Eyal Allweil <ey...@apache.org>
---
 .../src/main/resources/pyspark_utils/df_utils.py   | 29 +++++++++-------------
 .../src/main/scala/datafu/spark/DataFrameOps.scala | 10 ++++++++
 .../scala/datafu/spark/ScalaPythonBridge.scala     |  9 +++++++
 .../spark/utils/overwrites/SparkPythonRunner.scala |  7 ++++--
 4 files changed, 36 insertions(+), 19 deletions(-)

diff --git a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
index e5c3d14..f6261c5 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
@@ -34,6 +34,10 @@ class PySparkDFUtils(object):
         jvm_utils = _getjvm_class(self._gateway, "datafu.spark.SparkDFUtilsBridge")
         return jvm_utils
 
+    def _get_utils(self, df):
+        self._initSparkContext(df._sc, df.sql_ctx)
+        return self._get_jvm_spark_utils()
+
     # public:
 
     def dedup(self, dataFrame, groupCol, orderCols = []):
@@ -44,9 +48,8 @@ class PySparkDFUtils(object):
         :param orderCols: columns to order the records according to.
         :return: DataFrame representing the data after the operation
         """
-        self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
         java_cols = self._cols_to_java_cols(orderCols)
-        jdf = self._get_jvm_spark_utils().dedup(dataFrame._jdf, groupCol._jc, java_cols)
+        jdf = self._get_utils(dataFrame).dedup(dataFrame._jdf, groupCol._jc, java_cols)
         return DataFrame(jdf, self._sqlContext)
 
     def dedupTopN(self, dataFrame, n, groupCol, orderCols = []):
@@ -58,9 +61,8 @@ class PySparkDFUtils(object):
         :param orderCols: columns to order the records according to
         :return: DataFrame representing the data after the operation
         """
-        self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
         java_cols = self._cols_to_java_cols(orderCols)
-        jdf = self._get_jvm_spark_utils().dedupTopN(dataFrame._jdf, n, groupCol._jc, java_cols)
+        jdf = self._get_utils(dataFrame).dedupTopN(dataFrame._jdf, n, groupCol._jc, java_cols)
         return DataFrame(jdf, self._sqlContext)
 
     def dedup2(self, dataFrame, groupCol, orderByCol, desc = True, columnsFilter = [], columnsFilterKeep = True):
@@ -75,8 +77,7 @@ class PySparkDFUtils(object):
     *                          those columns in the result
         :return: DataFrame representing the data after the operation
         """
-        self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
-        jdf = self._get_jvm_spark_utils().dedup2(dataFrame._jdf, groupCol._jc, orderByCol._jc, desc, columnsFilter, columnsFilterKeep)
+        jdf = self._get_utils(dataFrame).dedup2(dataFrame._jdf, groupCol._jc, orderByCol._jc, desc, columnsFilter, columnsFilterKeep)
         return DataFrame(jdf, self._sqlContext)
 
     def changeSchema(self, dataFrame, newScheme = []):
@@ -86,8 +87,7 @@ class PySparkDFUtils(object):
         :param newScheme: new column names
         :return: DataFrame representing the data after the operation
         """
-        self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
-        jdf = self._get_jvm_spark_utils().changeSchema(dataFrame._jdf, newScheme)
+        jdf = self._get_utils(dataFrame).changeSchema(dataFrame._jdf, newScheme)
         return DataFrame(jdf, self._sqlContext)
 
     def joinSkewed(self, dfLeft, dfRight, joinExprs, numShards = 30, joinType= "inner"):
@@ -103,9 +103,7 @@ class PySparkDFUtils(object):
         :param joinType: join type
         :return: DataFrame representing the data after the operation
         """
-        self._initSparkContext(dfLeft._sc, dfLeft.sql_ctx)
-        utils = self._get_jvm_spark_utils()
-        jdf = utils.joinSkewed(dfLeft._jdf, dfRight._jdf, joinExprs._jc, numShards, joinType)
+        jdf = self._get_utils(dfLeft).joinSkewed(dfLeft._jdf, dfRight._jdf, joinExprs._jc, numShards, joinType)
         return DataFrame(jdf, self._sqlContext)
 
     def broadcastJoinSkewed(self, notSkewed, skewed, joinCol, numberCustsToBroadcast):
@@ -120,8 +118,7 @@ class PySparkDFUtils(object):
         :param numberCustsToBroadcast: number of custs to broadcast
         :return: DataFrame representing the data after the operation
         """
-        self._initSparkContext(skewed._sc, skewed.sql_ctx)
-        jdf = self._get_jvm_spark_utils().broadcastJoinSkewed(notSkewed._jdf, skewed._jdf, joinCol, numberCustsToBroadcast)
+        jdf = self._get_utils(skewed).broadcastJoinSkewed(notSkewed._jdf, skewed._jdf, joinCol, numberCustsToBroadcast)
         return DataFrame(jdf, self._sqlContext)
 
     def joinWithRange(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, decreaseFactor):
@@ -133,8 +130,7 @@ class PySparkDFUtils(object):
         1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range.
         2. the range and single columns to be numeric.
         """
-        self._initSparkContext(dfSingle._sc, dfSingle.sql_ctx)
-        jdf = self._get_jvm_spark_utils().joinWithRange(dfSingle._jdf, colSingle, dfRange._jdf, colRangeStart, colRangeEnd, decreaseFactor)
+        jdf = self._get_utils(dfSingle).joinWithRange(dfSingle._jdf, colSingle, dfRange._jdf, colRangeStart, colRangeEnd, decreaseFactor)
         return DataFrame(jdf, self._sqlContext)
 
     def joinWithRangeAndDedup(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, decreaseFactor, dedupSmallRange):
@@ -146,8 +142,7 @@ class PySparkDFUtils(object):
         1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range.
         2. the range and single columns to be numeric.
         """
-        self._initSparkContext(dfSingle._sc, dfSingle.sql_ctx)
-        jdf = self._get_jvm_spark_utils().joinWithRangeAndDedup(dfSingle._jdf, colSingle, dfRange._jdf, colRangeStart, colRangeEnd, decreaseFactor, dedupSmallRange)
+        jdf = self._get_utils(dfSingle).joinWithRangeAndDedup(dfSingle._jdf, colSingle, dfRange._jdf, colRangeStart, colRangeEnd, decreaseFactor, dedupSmallRange)
         return DataFrame(jdf, self._sqlContext)
 
     def _cols_to_java_cols(self, cols):
diff --git a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
index 110f13f..8f74aa2 100644
--- a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
@@ -20,6 +20,16 @@ package datafu.spark
 
 import org.apache.spark.sql.{Column, DataFrame}
 
+/**
+ * implicit class to enable easier usage e.g:
+ *
+ * df.dedup(..)
+ *
+ * instead of:
+ *
+ * SparkDFUtils.dedup(...)
+ *
+ */
 object DataFrameOps {
 
   implicit class someDataFrameUtils(df: DataFrame) {
diff --git a/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
index 4e0fc77..1726916 100644
--- a/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
@@ -30,6 +30,15 @@ import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.deploy.SparkPythonRunner
 import org.apache.spark.sql.SparkSession
 
+
+/**
+ * this class let's the user invoke PySpark code from scala
+ * example usage:
+ *
+ * val runner = ScalaPythonBridgeRunner()
+ * runner.runPythonFile("my_package/my_pyspark_logic.py")
+ *
+ */
 case class ScalaPythonBridgeRunner(extraPath: String = "") {
 
   val logger = LoggerFactory.getLogger(this.getClass)
diff --git a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
index c904587..6345467 100644
--- a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
+++ b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
@@ -28,8 +28,11 @@ import org.apache.spark.api.python.PythonUtils
 import org.apache.spark.util.Utils
 
 /**
- * We wrap Spark's PythonRunner because we failed on premature python process closing.
- * in PythonRunner the python process exits immediately when finished to read the filename,
+ * Internal class - should not be used by user
+ *
+ * background:
+ * We had to "override" Spark's PythonRunner because we failed on premature python process closing.
+ * In PythonRunner the python process exits immediately when finished to read the file,
  * this caused us to Accumulators Exceptions when the driver tries to get accumulation data
  * from the python gateway.
  * Instead, like in Zeppelin, we create an "interactive" python process, feed it the python