You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2019/02/26 12:13:16 UTC

[datafu] 02/02: Add ScalaPython bridge and joinWithRangeAndDedup

This is an automated email from the ASF dual-hosted git repository.

eyal pushed a commit to branch spark-tmp
in repository https://gitbox.apache.org/repos/asf/datafu.git

commit 36dfef565bd0de8521e95dfc60b820de14fbc36f
Author: Ohad Raviv <or...@paypal.com>
AuthorDate: Tue Feb 26 14:08:49 2019 +0200

    Add ScalaPython bridge and joinWithRangeAndDedup
    
    Signed-off-by: Eyal Allweil <ey...@apache.org>
---
 .gitignore                                         |   2 +-
 datafu-spark/build.gradle                          |   2 +-
 .../META-INF/services/datafu.spark.PythonResource  |   1 +
 .../src/main/resources/pyspark_utils/__init__.py   |   0
 .../main/resources/pyspark_utils/bridge_utils.py   |  55 ++++++++
 .../src/main/resources/pyspark_utils/df_utils.py   | 144 +++++++++++++++++++
 .../resources/pyspark_utils/init_spark_context.py  |   5 +
 .../src/main/scala/datafu/spark/DataFrameOps.scala |   4 +
 .../scala/datafu/spark/PythonPathsManager.scala    | 147 ++++++++++++++++++++
 .../scala/datafu/spark/ScalaPythonBridge.scala     | 152 +++++++++++++++++++++
 .../src/main/scala/datafu/spark/SparkDFUtils.scala | 141 +++++++++++++------
 .../spark/utils/overwrites/SparkPythonRunner.scala | 140 +++++++++++++++++++
 .../META-INF/services/datafu.spark.PythonResource  |   3 +
 .../built_in_pyspark_lib/py4j-0.10.6-src.zip       | Bin 0 -> 80352 bytes
 .../resources/built_in_pyspark_lib/pyspark.zip     | Bin 0 -> 541536 bytes
 .../src/test/resources/example_tests/__init__.py   |   0
 .../test/resources/example_tests/df_utils_tests.py |  71 ++++++++++
 .../test/resources/example_tests/pyfromscala.py    |  74 ++++++++++
 .../example_tests/pyfromscala_with_error.py        |   3 +
 datafu-spark/src/test/resources/log4j.properties   |   5 +
 datafu-spark/src/test/resources/text.csv           |   5 +
 .../datafu/spark/PySparkLibTestResources.scala     |  28 ++++
 .../scala/datafu/spark/TestScalaPythonBridge.scala | 111 +++++++++++++++
 .../test/scala/datafu/spark/TestSparkDFUtils.scala |  36 ++++-
 gradle.properties                                  |   3 +-
 25 files changed, 1084 insertions(+), 48 deletions(-)

diff --git a/.gitignore b/.gitignore
index 96aeaa0..9a2f99a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,4 +35,4 @@ gradlew*
 datafu-scala-spark/out
 datafu-scala-spark/derby.log
 datafu-spark/spark-warehouse
-datafu-spark/metastore_db
+datafu-spark/metastore_db
\ No newline at end of file
diff --git a/datafu-spark/build.gradle b/datafu-spark/build.gradle
index 3a9d181..867ca42 100644
--- a/datafu-spark/build.gradle
+++ b/datafu-spark/build.gradle
@@ -37,7 +37,7 @@ allprojects {
 	}
 }
 
-archivesBaseName = 'datafu-spark_' + scalaVersion
+archivesBaseName = 'datafu-spark_' + scalaVersion + '_' + sparkVersion
 
 import groovy.xml.MarkupBuilder
 
diff --git a/datafu-spark/src/main/resources/META-INF/services/datafu.spark.PythonResource b/datafu-spark/src/main/resources/META-INF/services/datafu.spark.PythonResource
new file mode 100644
index 0000000..a029545
--- /dev/null
+++ b/datafu-spark/src/main/resources/META-INF/services/datafu.spark.PythonResource
@@ -0,0 +1 @@
+datafu.spark.CoreBridgeDirectory
\ No newline at end of file
diff --git a/datafu-spark/src/main/resources/pyspark_utils/__init__.py b/datafu-spark/src/main/resources/pyspark_utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
new file mode 100644
index 0000000..69927d3
--- /dev/null
+++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
@@ -0,0 +1,55 @@
+import os
+
+from py4j.java_gateway import JavaGateway, GatewayClient
+from pyspark.conf import SparkConf
+from pyspark.context import SparkContext
+from pyspark.sql import SparkSession
+
+# use jvm gateway to create a java class instance by full-qualified class name
+def _getjvm_class(gateway, fullClassName):
+    return gateway.jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(fullClassName).newInstance()
+
+
+class Context(object):
+
+    def __init__(self):
+        from py4j.java_gateway import java_import
+        """When running a Python script from Scala - this function is called
+        by the script to initialize the connection to the Java Gateway and get the spark context.
+        code is basically copied from: 
+         https://github.com/apache/zeppelin/blob/master/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py#L30
+        """
+
+        if os.environ.get("SPARK_EXECUTOR_URI"):
+            SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
+
+        gateway = JavaGateway(GatewayClient(port=int(os.environ.get("PYSPARK_GATEWAY_PORT"))), auto_convert=True)
+        java_import(gateway.jvm, "org.apache.spark.SparkEnv")
+        java_import(gateway.jvm, "org.apache.spark.SparkConf")
+        java_import(gateway.jvm, "org.apache.spark.api.java.*")
+        java_import(gateway.jvm, "org.apache.spark.api.python.*")
+        java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
+        java_import(gateway.jvm, "org.apache.spark.sql.*")
+        java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
+
+        intp = gateway.entry_point
+
+        jSparkSession = intp.pyGetSparkSession()
+        jsc = intp.pyGetJSparkContext(jSparkSession)
+        jconf = intp.pyGetSparkConf(jsc)
+        conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
+        self.sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
+
+        # Spark 2
+        self.sparkSession = SparkSession(self.sc, jSparkSession)
+        self.sqlContext = self.sparkSession._wrapped
+
+ctx = None
+
+
+def get_contexts():
+    global ctx
+    if not ctx:
+        ctx = Context()
+
+    return ctx.sc, ctx.sqlContext, ctx.sparkSession
diff --git a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
new file mode 100644
index 0000000..746ae44
--- /dev/null
+++ b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
@@ -0,0 +1,144 @@
+from pyspark.sql import DataFrame
+
+from pyspark_utils.bridge_utils import _getjvm_class
+
+
+class PySparkDFUtils(object):
+
+    def __init__(self):
+        self._sc = None
+
+    def _initSparkContext(self, sc, sqlContext):
+        self._sc = sc
+        self._sqlContext = sqlContext
+        self._gateway = sc._gateway
+
+    def _get_jvm_spark_utils(self):
+        jvm_utils = _getjvm_class(self._gateway, "datafu.spark.SparkDFUtilsBridge")
+        return jvm_utils
+
+    # public:
+
+    def dedup(self, dataFrame, groupCol, orderCols = []):
+        """
+        Used get the 'latest' record (after ordering according to the provided order columns) in each group.
+        :param dataFrame: DataFrame to operate on
+        :param groupCol: column to group by the records
+        :param orderCols: columns to order the records according to.
+        :return: DataFrame representing the data after the operation
+        """
+        self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
+        java_cols = self._cols_to_java_cols(orderCols)
+        jdf = self._get_jvm_spark_utils().dedup(dataFrame._jdf, groupCol._jc, java_cols)
+        return DataFrame(jdf, self._sqlContext)
+
+    def dedupTopN(self, dataFrame, n, groupCol, orderCols = []):
+        """
+        Used get the top N records (after ordering according to the provided order columns) in each group.
+        :param dataFrame: DataFrame to operate on
+        :param n: number of records to return from each group
+        :param groupCol: column to group by the records
+        :param orderCols: columns to order the records according to
+        :return: DataFrame representing the data after the operation
+        """
+        self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
+        java_cols = self._cols_to_java_cols(orderCols)
+        jdf = self._get_jvm_spark_utils().dedupTopN(dataFrame._jdf, n, groupCol._jc, java_cols)
+        return DataFrame(jdf, self._sqlContext)
+
+    def dedup2(self, dataFrame, groupCol, orderByCol, desc = True, columnsFilter = [], columnsFilterKeep = True):
+        """
+        Used get the 'latest' record (after ordering according to the provided order columns) in each group.
+        :param dataFrame: DataFrame to operate on
+        :param groupCol: column to group by the records
+        :param orderByCol: column to order the records according to
+        :param desc: have the order as desc
+        :param columnsFilter: columns to filter
+        :param columnsFilterKeep: indicates whether we should filter the selected columns 'out' or alternatively have only
+    *                          those columns in the result
+        :return: DataFrame representing the data after the operation
+        """
+        self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
+        jdf = self._get_jvm_spark_utils().dedup2(dataFrame._jdf, groupCol._jc, orderByCol._jc, desc, columnsFilter, columnsFilterKeep)
+        return DataFrame(jdf, self._sqlContext)
+
+    def changeSchema(self, dataFrame, newScheme = []):
+        """
+        Returns a DataFrame with the column names renamed to the column names in the new schema
+        :param dataFrame: DataFrame to operate on
+        :param newScheme: new column names
+        :return: DataFrame representing the data after the operation
+        """
+        self._initSparkContext(dataFrame._sc, dataFrame.sql_ctx)
+        jdf = self._get_jvm_spark_utils().changeSchema(dataFrame._jdf, newScheme)
+        return DataFrame(jdf, self._sqlContext)
+
+    def joinSkewed(self, dfLeft, dfRight, joinExprs, numShards = 30, joinType= "inner"):
+        """
+        Used to perform a join when the right df is relatively small but doesn't fit to perform broadcast join.
+        Use cases:
+            a. excluding keys that might be skew from a medium size list.
+            b. join a big skewed table with a table that has small number of very big rows.
+        :param dfLeft: left DataFrame
+        :param dfRight: right DataFrame
+        :param joinExprs: join expression
+        :param numShards: number of shards
+        :param joinType: join type
+        :return: DataFrame representing the data after the operation
+        """
+        self._initSparkContext(dfLeft._sc, dfLeft.sql_ctx)
+        utils = self._get_jvm_spark_utils()
+        jdf = utils.joinSkewed(dfLeft._jdf, dfRight._jdf, joinExprs._jc, numShards, joinType)
+        return DataFrame(jdf, self._sqlContext)
+
+    def broadcastJoinSkewed(self, notSkewed, skewed, joinCol, numberCustsToBroadcast):
+        """
+        Suitable to perform a join in cases when one DF is skewed and the other is not skewed.
+        splits both of the DFs to two parts according to the skewed keys.
+        1. Map-join: broadcasts the skewed-keys part of the not skewed DF to the skewed-keys part of the skewed DF
+        2. Regular join: between the remaining two parts.
+        :param notSkewed: not skewed DataFrame
+        :param skewed: skewed DataFrame
+        :param joinCol: join column
+        :param numberCustsToBroadcast: number of custs to broadcast
+        :return: DataFrame representing the data after the operation
+        """
+        self._initSparkContext(skewed._sc, skewed.sql_ctx)
+        jdf = self._get_jvm_spark_utils().broadcastJoinSkewed(notSkewed._jdf, skewed._jdf, joinCol, numberCustsToBroadcast)
+        return DataFrame(jdf, self._sqlContext)
+
+    def joinWithRange(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, decreaseFactor):
+        """
+        Helper function to join a table with column to a table with range of the same column.
+        For example, ip table with whois data that has range of ips as lines.
+        The main problem which this handles is doing naive explode on the range can result in huge table.
+        requires:
+        1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range.
+        2. the range and single columns to be numeric.
+        """
+        self._initSparkContext(dfSingle._sc, dfSingle.sql_ctx)
+        jdf = self._get_jvm_spark_utils().joinWithRange(dfSingle._jdf, colSingle, dfRange._jdf, colRangeStart, colRangeEnd, decreaseFactor)
+        return DataFrame(jdf, self._sqlContext)
+
+    def joinWithRangeAndDedup(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, decreaseFactor, dedupSmallRange):
+        """
+        Helper function to join a table with column to a table with range of the same column.
+        For example, ip table with whois data that has range of ips as lines.
+        The main problem which this handles is doing naive explode on the range can result in huge table.
+        requires:
+        1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range.
+        2. the range and single columns to be numeric.
+        """
+        self._initSparkContext(dfSingle._sc, dfSingle.sql_ctx)
+        jdf = self._get_jvm_spark_utils().joinWithRangeAndDedup(dfSingle._jdf, colSingle, dfRange._jdf, colRangeStart, colRangeEnd, decreaseFactor, dedupSmallRange)
+        return DataFrame(jdf, self._sqlContext)
+
+    def _cols_to_java_cols(self, cols):
+        return self._map_if_needed(lambda x: x._jc, cols)
+
+    def _dfs_to_java_dfs(self, dfs):
+        return self._map_if_needed(lambda x: x._jdf, dfs)
+
+    def _map_if_needed(self, func, itr):
+        return map(func, itr) if itr is not None else itr
+
diff --git a/datafu-spark/src/main/resources/pyspark_utils/init_spark_context.py b/datafu-spark/src/main/resources/pyspark_utils/init_spark_context.py
new file mode 100644
index 0000000..d879d03
--- /dev/null
+++ b/datafu-spark/src/main/resources/pyspark_utils/init_spark_context.py
@@ -0,0 +1,5 @@
+
+from pyspark_utils.bridge_utils import get_contexts
+sc, sqlContext, spark = get_contexts()
+
+print "initiated contexts"
diff --git a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
index 6f5ae18..f1ac607 100644
--- a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
@@ -40,6 +40,10 @@ object DataFrameOps {
     def joinWithRange(colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long = 2^8) =
       SparkDFUtils.joinWithRange(df, colSingle, dfRange, colRangeStart, colRangeEnd, DECREASE_FACTOR)
 
+    def joinWithRangeAndDedup(colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long = 2^8, dedupSmallRange: Boolean = true): DataFrame = {
+      SparkDFUtils.joinWithRangeAndDedup(df, colSingle, dfRange, colRangeStart, colRangeEnd, DECREASE_FACTOR, dedupSmallRange)
+    }
+
     def broadcastJoinSkewed(skewed: DataFrame, joinCol: String, numberCustsToBroadcast: Int): DataFrame = {
       SparkDFUtils.broadcastJoinSkewed(df, skewed, joinCol, numberCustsToBroadcast)
     }
diff --git a/datafu-spark/src/main/scala/datafu/spark/PythonPathsManager.scala b/datafu-spark/src/main/scala/datafu/spark/PythonPathsManager.scala
new file mode 100644
index 0000000..b451846
--- /dev/null
+++ b/datafu-spark/src/main/scala/datafu/spark/PythonPathsManager.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.spark
+
+import java.io.{File, IOException}
+import java.net.JarURLConnection
+import java.nio.file.Paths
+import java.util.{MissingResourceException, ServiceLoader}
+
+import org.apache.log4j.Logger
+
+import scala.collection.JavaConverters._
+
+/**
+  * Represents a resource that needs to be added to PYTHONPATH used by ScalaPythonBridge.
+  *
+  * To ensure your python resources (modules, files, etc.) are properly added to the bridge, do the following:
+  * 1)  Put all the resource under some root directory with a unique name x, and make sure path/to/x
+  * is visible to the class loader (usually just use src/main/resources/x).
+  * 2)  Extend this class like this:
+  * class MyResource extends PythonResource("x")
+  * This assumes x is under src/main/resources/x
+  * 3)  (since we use ServiceLoader) Add a file to your jar/project:
+  * META-INF/services/spark.utils.PythonResource
+  * with a single line containing the full name (including package) of MyResource.
+  *
+  * This process involves scanning the entire jar and copying files from the jar to some temporary location,
+  * so if your jar is really big consider putting the resources in a smaller jar.
+  *
+  * @param resourcePath   Path to the resource, will be loaded via getClass.getClassLoader.getResource()
+  * @param isAbsolutePath Set to true if the resource is in some absolute path rather than in jar (try to avoid that).
+  */
+abstract class PythonResource(val resourcePath: String, val isAbsolutePath: Boolean = false)
+
+/**
+  * There are two phases of resolving python files path:
+  *
+  * 1) When launching spark:
+  *   the files need to be added to spark.executorEnv.PYTHONPATH
+  *   on cluster mode this is handled by com.paypal.risk.ars.bigdata.execution_fw.hadoop.spark.SparkTask
+  *   and on test mode by com.paypal.risk.ars.bigdata.hadoop.spark.utils.SparkTestManager
+  *
+  * 2) When executing python file via bridge:
+  *   the files need to be added to the process PYTHONPATH. This is different than the previous phase because
+  *   this python process is spawn by zonkey, not by spark, and always on the driver.
+  */
+object PythonPathsManager {
+
+  case class ResolvedResource(resource: PythonResource, resolvedLocation: String)
+
+  private val logger: Logger = Logger.getLogger(getClass)
+
+  val resources: Seq[ResolvedResource] =
+    ServiceLoader.load(classOf[PythonResource])
+      .asScala
+      .map(p => ResolvedResource(p, resolveDependencyLocation(p)))
+      .toSeq
+
+  logResolved
+
+  def getAbsolutePaths() = resources.map(_.resolvedLocation).distinct
+  def getAbsolutePathsForJava() = resources.map(_.resolvedLocation).distinct.asJava
+
+  def getPYTHONPATH(): String =
+    resources
+      .map(_.resolvedLocation)
+      .map(p => new File(p))
+      .map(_.getName) //get just the name of the file
+      .mkString(":")
+
+  private def resolveDependencyLocation(resource: PythonResource): String =
+    if (resource.isAbsolutePath) {
+      if (!new File(resource.resourcePath).exists())
+        throw new IOException("Could not find resource in absolute path: " + resource.resourcePath)
+      else {
+        logger.info("Using file absolute path: " + resource.resourcePath)
+        resource.resourcePath
+      }
+    } else {
+      Option(getClass.getClassLoader.getResource(resource.resourcePath)) match {
+        case None =>
+          logger.error("Didn't find resource in classpath! resource path: " + resource.resourcePath)
+          throw new MissingResourceException("Didn't find resource in classpath!", resource.getClass.getName, resource.resourcePath)
+        case Some(p) =>
+          p.toURI.getScheme match {
+            case "jar" =>
+              //if dependency is inside jar file, use jar file path:
+              val jarPath = new File(p.openConnection().asInstanceOf[JarURLConnection].getJarFileURL.toURI).getPath
+              logger.info(s"Dependency ${resource.resourcePath} found inside jar: " + jarPath)
+              jarPath
+            case "file" =>
+              val file = new File(p.getFile)
+              if (!file.exists()) {
+                logger.warn("Dependency not found, skipping: " + file.getPath)
+                null
+              }
+              else {
+                if (file.isDirectory) {       
+                  val t_path =
+                    if (System.getProperty("os.name").toLowerCase().contains("win")&& p.getPath().startsWith("/")) {
+                      val path = p.getPath.substring(1)
+                      logger.warn(s"Fixing path for windows operating system! converted ${p.getPath} to $path")
+                      path
+                    }
+                    else
+                      p.getPath
+                  val path = Paths.get(t_path)
+                  logger.info(s"Dependency found as directory: ${t_path}\n\tusing parent path: ${path.getParent}")
+                  path.getParent.toString
+                } else {
+                  logger.info("Dependency found as a file: " + p.getPath)
+                  p.getPath
+                }
+              }
+          }
+      }
+    }
+
+  private def logResolved = {
+    logger.info(
+      s"Discovered ${resources.size} python paths:\n" +
+        resources
+          .map(p => s"className: ${p.resource.getClass.getName}\n\tresource: ${p.resource.resourcePath}\n\tlocation: ${p.resolvedLocation}")
+          .mkString("\n")) + "\n\n"
+  }
+}
+
+/**
+  * Contains all python files needed by the bridge itself
+  */
+class CoreBridgeDirectory extends PythonResource("pyspark_utils")
diff --git a/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
new file mode 100644
index 0000000..8b9cf82
--- /dev/null
+++ b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.spark
+
+import java.io._
+import java.net.URL
+import java.nio.file.Files
+import java.util.UUID
+
+import org.apache.spark.SparkConf
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.deploy.SparkPythonRunner
+import org.apache.spark.sql.SparkSession
+import org.slf4j.LoggerFactory
+
+
+case class ScalaPythonBridgeRunner(extraPath: String = "") {
+
+  val logger = LoggerFactory.getLogger(this.getClass)
+  //for the bridge we take the full resolved location, since this runs on the driver where the files are local:
+  logger.info("constructing PYTHONPATH")
+  val pythonPath = (PythonPathsManager.getAbsolutePaths() ++
+    Array("pyspark.zip", "py4j-0.10.6-src.zip") ++
+    Option(extraPath).getOrElse("").split(",")).distinct
+
+  logger.info("Bridge PYTHONPATH: " + pythonPath.mkString(":"))
+
+  val runner = SparkPythonRunner(pythonPath.mkString(","))
+
+  def runPythonFile(filename: String): String = {
+    val pyScript = resolveRunnableScript(filename)
+    //    this.getClass.getClassLoader.asInstanceOf[URLClassLoader].getURLs.foreach(s => logger.info("class: " + s))
+    logger.info(s"Running python file $pyScript")
+    runner.runPyFile(pyScript)
+  }
+
+  def runPythonString(str: String): String = {
+    val tmpFile = writeToTempFile(str, "pyspark-tmp-file-", ".py")
+    logger.info("Running tmp PySpark file: " + tmpFile.getAbsolutePath + " with content:\n" + str)
+    runner.runPyFile(tmpFile.getAbsolutePath)
+  }
+
+  private def resolveRunnableScript(path: String): String = {
+    logger.info("Resolving python script location for: " + path)
+
+    val res: String = Option(this.getClass.getClassLoader.getResource(path)) match {
+      case None =>
+        logger.info("Didn't find script via classLoader, using as is: " + path)
+        path
+      case Some(resource) =>
+        resource.toURI.getScheme match {
+          case "jar" =>
+            //if inside jar, extract it and return cloned file:
+            logger.info("Script found inside jar, extracting...")
+            val outputFile = ResourceCloning.cloneResource(resource, path)
+            logger.info("Extracted file path: " + outputFile.getPath)
+            outputFile.getPath
+          case _ =>
+            logger.info("Using script original path: " + resource.getPath)
+            resource.getPath
+        }
+    }
+    res
+  }
+
+  private def writeToTempFile(contents: String, prefix: String, suffix: String): File = {
+    val tempFi = File.createTempFile(prefix, suffix)
+    tempFi.deleteOnExit()
+    val bw = new BufferedWriter(new FileWriter(tempFi))
+    bw.write(contents)
+    bw.close()
+    tempFi
+  }
+
+}
+
+/**
+  * Do not instantiate this class! Use the companion object instead.
+  * This class should only be used by python
+  */
+object ScalaPythonBridge { //need empty ctor for py4j gateway
+
+  /** members used to allow python script share context with main Scala program calling it.
+    * Python script calls :
+    * sc, sqlContext, spark = utils.get_contexts()
+    * our Python util function get_contexts
+    * uses the following to create Python wrappers around Java SparkContext and SQLContext.
+    */
+  // Called by python util get_contexts()
+  def pyGetSparkSession(): SparkSession = SparkSession.builder().getOrCreate()
+  def pyGetJSparkContext(sparkSession: SparkSession): JavaSparkContext = new JavaSparkContext(sparkSession.sparkContext)
+  def pyGetSparkConf(jsc: JavaSparkContext): SparkConf = jsc.getConf
+
+}
+
+/**
+  * Utility for extracting resource from a jar and copy it to a temporary location
+  */
+object ResourceCloning {
+
+  private val logger = LoggerFactory.getLogger(this.getClass)
+
+  val uuid = UUID.randomUUID().toString.substring(6)
+  val outputTempDir = new File(System.getProperty("java.io.tmpdir"), s"risk_tmp/$uuid/cloned_resources/")
+  forceMkdirs(outputTempDir)
+
+  def cloneResource(resource: URL, outputFileName: String): File = {
+    val outputTmpFile = new File(outputTempDir, outputFileName)
+    if (outputTmpFile.exists()) {
+      logger.info(s"resource $outputFileName already exists, skipping..")
+      outputTmpFile
+    } else {
+      logger.info("cloning resource: " + resource)
+      if (!outputTmpFile.exists()) { //it is possible that the file was already extracted in the session
+        forceMkdirs(outputTmpFile.getParentFile)
+        val inputStream = resource.openStream()
+        streamToFile(outputTmpFile, inputStream)
+      }
+      outputTmpFile
+    }
+  }
+
+  private def forceMkdirs(dir: File) =
+    if (!dir.exists() && !dir.mkdirs())
+      throw new IOException("Failed to create " + dir.getPath)
+
+  private def streamToFile(outputFile: File, inputStream: InputStream) = {
+    try {
+      Files.copy(inputStream, outputFile.toPath)
+    }
+    finally {
+      inputStream.close()
+      assert(outputFile.exists())
+    }
+  }
+}
\ No newline at end of file
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index f22a13f..61386bd 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -20,16 +20,59 @@ package datafu.spark
 
 import java.util.{List => JavaList}
 
-import org.apache.log4j.Logger
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{LongType, SparkOverwriteUDAFs, StructType}
 import org.apache.spark.sql.{Column, DataFrame}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.SizeEstimator
 
-object SparkDFUtils extends SparkDFUtilsTrait {
+/**
+  * class definition so we could expose this functionality in PySpark
+  */
+class SparkDFUtilsBridge {
+
+  def dedup(df: DataFrame, groupCol: Column, orderCols: JavaList[Column]): DataFrame = {
+    val converted = convertJavaListToSeq(orderCols)
+    SparkDFUtils.dedup(df = df, groupCol = groupCol, orderCols = converted: _*)
+  }
+
+  def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: JavaList[Column]): DataFrame = {
+    val converted = convertJavaListToSeq(orderCols)
+    SparkDFUtils.dedupTopN(df = df, n = n, groupCol = groupCol, orderCols = converted: _*)
+  }
+
+  def dedup2(df: DataFrame, groupCol: Column, orderByCol: Column, desc: Boolean, columnsFilter: JavaList[String], columnsFilterKeep: Boolean): DataFrame = {
+    val columnsFilter_converted = convertJavaListToSeq(columnsFilter)
+    SparkDFUtils.dedup2(df = df, groupCol = groupCol, orderByCol = orderByCol, desc = desc, moreAggFunctions = Nil, columnsFilter = columnsFilter_converted, columnsFilterKeep = columnsFilterKeep)
+  }
+
+  def changeSchema(df: DataFrame, newScheme: JavaList[String]): DataFrame = {
+    val newScheme_converted = convertJavaListToSeq(newScheme)
+    SparkDFUtils.changeSchema(df = df, newScheme = newScheme_converted: _*)
+  }
+
+  def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int, joinType: String): DataFrame = {
+    SparkDFUtils.joinSkewed(dfLeft = dfLeft, dfRight = dfRight, joinExprs = joinExprs, numShards = numShards, joinType = joinType)
+  }
+
+  def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numRowsToBroadcast: Int): DataFrame = {
+    SparkDFUtils.broadcastJoinSkewed(notSkewed = notSkewed, skewed = skewed, joinCol = joinCol, numRowsToBroadcast = numRowsToBroadcast)
+  }
+
+  def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
+    SparkDFUtils.joinWithRange(dfSingle = dfSingle, colSingle = colSingle, dfRange = dfRange, colRangeStart = colRangeStart, colRangeEnd = colRangeEnd, DECREASE_FACTOR = DECREASE_FACTOR)
+  }
+
+  def joinWithRangeAndDedup(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long, dedupSmallRange: Boolean): DataFrame = {
+    SparkDFUtils.joinWithRangeAndDedup(dfSingle = dfSingle, colSingle = colSingle, dfRange = dfRange, colRangeStart = colRangeStart, colRangeEnd = colRangeEnd, DECREASE_FACTOR = DECREASE_FACTOR, dedupSmallRange = dedupSmallRange)
+  }
+
+  private def convertJavaListToSeq[T](list: JavaList[T]): Seq[T] = {
+    scala.collection.JavaConverters.asScalaIteratorConverter(list.iterator()).asScala.toList
+  }
+}
+
+object SparkDFUtils {
 
   /**
     * Used to get the 'latest' record (after ordering according to the provided order columns) in each group.
@@ -40,19 +83,21 @@ object SparkDFUtils extends SparkDFUtilsTrait {
     * @param orderCols columns to order the records according to
     * @return DataFrame representing the data after the operation
     */
-  override def dedup(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame = {
+  def dedup(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame = {
+    df.dropDuplicates()
     dedupTopN(df, 1, groupCol, orderCols: _*)
   }
 
   /**
     * Used get the top N records (after ordering according to the provided order columns) in each group.
+    *
     * @param df DataFrame to operate on
     * @param n number of records to return from each group
     * @param groupCol column to group by the records
     * @param orderCols columns to order the records according to
     * @return DataFrame representing the data after the operation
     */
-  override def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: Column*): DataFrame = {
+  def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: Column*): DataFrame = {
     val w = Window.partitionBy(groupCol).orderBy(orderCols: _*)
     df.withColumn("rn", row_number.over(w)).where(col("rn") <= n).drop("rn")
   }
@@ -71,7 +116,7 @@ object SparkDFUtils extends SparkDFUtilsTrait {
     *                          those columns in the result
     * @return DataFrame representing the data after the operation
     */
-  override def dedup2(df: DataFrame, groupCol: Column, orderByCol: Column, desc: Boolean = true, moreAggFunctions: Seq[Column] = Nil, columnsFilter: Seq[String] = Nil, columnsFilterKeep: Boolean = true): DataFrame = {
+  def dedup2(df: DataFrame, groupCol: Column, orderByCol: Column, desc: Boolean = true, moreAggFunctions: Seq[Column] = Nil, columnsFilter: Seq[String] = Nil, columnsFilterKeep: Boolean = true): DataFrame = {
     val newDF = if (columnsFilter == Nil)
       df.withColumn("sort_by_column", orderByCol)
     else {
@@ -81,7 +126,6 @@ object SparkDFUtils extends SparkDFUtilsTrait {
         df.select(df.columns.filter(colName => !columnsFilter.contains(colName)).map(colName => new Column(colName)):_*).withColumn("sort_by_column", orderByCol)
     }
 
-
     val aggFunc = if (desc) SparkOverwriteUDAFs.maxValueByKey(_:Column, _:Column)
                   else      SparkOverwriteUDAFs.minValueByKey(_:Column, _:Column)
 
@@ -115,7 +159,7 @@ object SparkDFUtils extends SparkDFUtilsTrait {
   * @param colName column name for a column of type StructType
   * @return DataFrame representing the data after the operation
   */
-  override def flatten(df: DataFrame, colName: String): DataFrame = {
+  def flatten(df: DataFrame, colName: String): DataFrame = {
     assert(df.schema(colName).dataType.isInstanceOf[StructType], s"Column $colName must be of type Struct")
     val outerFields = df.schema.fields.map(_.name).toSet
     val flattenFields = df.schema(colName).dataType.asInstanceOf[StructType].fields.filter(f => !outerFields.contains(f.name)).map("`" + colName + "`.`" + _.name + "`")
@@ -124,11 +168,12 @@ object SparkDFUtils extends SparkDFUtilsTrait {
 
   /**
   * Returns a DataFrame with the column names renamed to the column names in the new schema
+  *
   * @param df DataFrame to operate on
   * @param newScheme new column names
   * @return DataFrame representing the data after the operation
   */
-  override def changeSchema(df: DataFrame, newScheme: String*): DataFrame =
+  def changeSchema(df: DataFrame, newScheme: String*): DataFrame =
     df.select(df.columns.zip(newScheme).map {case (oldCol: String, newCol: String) => col(oldCol).as(newCol)}: _*)
 
   /**
@@ -144,7 +189,7 @@ object SparkDFUtils extends SparkDFUtilsTrait {
     * @param joinType join type
     * @return joined DataFrame
     */
-  override def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int = 10, joinType: String = "inner"): DataFrame = {
+  def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int = 10, joinType: String = "inner"): DataFrame = {
     // skew join based on salting
     // salts the left DF by adding another random column and join with the right DF after duplicating it
     val ss = dfLeft.sparkSession
@@ -158,13 +203,14 @@ object SparkDFUtils extends SparkDFUtilsTrait {
     * splits both of the DFs to two parts according to the skewed keys.
     * 1. Map-join: broadcasts the skewed-keys part of the not skewed DF to the skewed-keys part of the skewed DF
     * 2. Regular join: between the remaining two parts.
+    *
     * @param notSkewed not skewed DataFrame
     * @param skewed skewed DataFrame
     * @param joinCol join column
     * @param numRowsToBroadcast num of rows to broadcast
     * @return DataFrame representing the data after the operation
     */
-  override def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numRowsToBroadcast: Int): DataFrame = {
+  def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numRowsToBroadcast: Int): DataFrame = {
     val ss = notSkewed.sparkSession
     import ss.implicits._
     val skewedKeys = skewed.groupBy(joinCol).count().sort($"count".desc).limit(numRowsToBroadcast).drop("count")
@@ -212,11 +258,13 @@ object SparkDFUtils extends SparkDFUtilsTrait {
     * +----------+---------+----------+
     *
     * OUTPUT:
-    * +-------+----------+---------+-------+
-    * |time   |start_time|end_time |desc   |
-    * +-------+----------+---------+-------+
-    * |11:55  |11:50     |12:15    | lunch |
-    * +-------+----------+---------+-------+
+    * +-------+----------+---------+---------+
+    * |time   |start_time|end_time |desc     |
+    * +-------+----------+---------+---------+
+    * |11:55  |10:00     |12:00    | meeting |
+    * +-------+----------+---------+---------+
+    * |11:55  |11:50     |12:15    | lunch   |
+    * +-------+----------+---------+---------+
     *
     * @param dfSingle - DataFrame that contains the point column
     * @param colSingle - the point column's name
@@ -226,43 +274,58 @@ object SparkDFUtils extends SparkDFUtilsTrait {
     * @param DECREASE_FACTOR - resolution factor. instead of exploding the range column directly, we first decrease its resolution by this factor
     * @return
     */
-  override def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
+  def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
+    val dfJoined = joinWithRangeInternal(dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, DECREASE_FACTOR)
+    dfJoined.drop("range_start", "range_end", "decreased_range_single", "single", "decreased_single", "range_size")
+  }
+
+  private def joinWithRangeInternal(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
 
     import org.apache.spark.sql.functions.udf
     val rangeUDF = udf((start: Long, end: Long) => (start to end).toArray)
     val dfRange_exploded = dfRange.withColumn("range_start", col(colRangeStart).cast(LongType))
                                   .withColumn("range_end"  , col(colRangeEnd).cast(LongType))
                                   .withColumn("decreased_range_single", explode(rangeUDF(col("range_start")/lit(DECREASE_FACTOR),
-                                                                                         col("range_end"  )/lit(DECREASE_FACTOR))))
+                                                                                                   col("range_end")  /lit(DECREASE_FACTOR))))
 
-    val dfJoined =
     dfSingle.withColumn("single", floor(col(colSingle).cast(LongType)))
             .withColumn("decreased_single", floor(col(colSingle).cast(LongType)/lit(DECREASE_FACTOR)))
             .join(dfRange_exploded, col("decreased_single") === col("decreased_range_single"), "left_outer")
             .withColumn("range_size", expr("(range_end - range_start + 1)"))
             .filter("single>=range_start and single<=range_end")
-
-    dedup2(dfJoined, col(colSingle), col("range_size"), desc = false)
-      .drop("range_start", "range_end", "decreased_range_single", "single", "decreased_single", "range_size")
-
   }
-}
-
-
-trait SparkDFUtilsTrait {
-  def dedup(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame
-
-  def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: Column*): DataFrame
 
-  def dedup2(df: DataFrame, groupCol: Column, orderByCol: Column, desc: Boolean, moreAggFunctions: Seq[Column], columnsFilter: Seq[String], columnsFilterKeep: Boolean): DataFrame
-
-  def flatten(df: DataFrame, colName: String): DataFrame
-
-  def changeSchema(df: DataFrame, newScheme: String*): DataFrame
+  /**
+    * Run joinWithRange and afterwards run dedup
+    *
+    * @param dedupSmallRange -  by small/large range
+    *
+    *  OUTPUT for dedupSmallRange = "true":
+    * +-------+----------+---------+---------+
+    * |time   |start_time|end_time |desc     |
+    * +-------+----------+---------+---------+
+    * |11:55  |11:50     |12:15    | lunch   |
+    * +-------+----------+---------+---------+
+    *
+    *  OUTPUT for dedupSmallRange = "false":
+    * +-------+----------+---------+---------+
+    * |time   |start_time|end_time |desc     |
+    * +-------+----------+---------+---------+
+    * |11:55  |10:00     |12:00    | meeting |
+    * +-------+----------+---------+---------+
+    *
+    */
+  def joinWithRangeAndDedup(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long, dedupSmallRange: Boolean): DataFrame = {
 
-  def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int, joinType: String): DataFrame
+    val dfJoined = joinWithRangeInternal(dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, DECREASE_FACTOR)
 
-  def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numberCustsToBroadcast: Int): DataFrame
+    // "range_start" is here for consistency
+    val dfDeduped = if (dedupSmallRange) {
+      dedup2(dfJoined, col(colSingle), struct("range_size", "range_start"), desc = false)
+    } else {
+      dedup2(dfJoined, col(colSingle), struct(expr("-range_size"), col("range_start")), desc = true)
+    }
 
-  def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame
+    dfDeduped.drop("range_start", "range_end", "decreased_range_single", "single", "decreased_single", "range_size")
+  }
 }
diff --git a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
new file mode 100644
index 0000000..3eed62c
--- /dev/null
+++ b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import java.io._
+
+import datafu.spark.ScalaPythonBridge
+import org.apache.log4j.Logger
+import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.util.Utils
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * We wrap Spark's PythonRunner because we failed on premature python process closing.
+  * in PythonRunner the python process exits immediately when finished to read the filename,
+  * this caused us to Accumulators Exceptions when the driver tries to get accumulation data
+  * from the python gateway.
+  * Instead, like in Zeppelin, we create an "interactive" python process, feed it the python
+  * script and not closing the gateway.
+  */
+case class SparkPythonRunner(pyPaths: String, otherArgs: Array[String] = Array()) {
+
+  val logger: Logger = Logger.getLogger(getClass)
+  val (reader, writer, process) = initPythonEnv()
+
+  def runPyFile(pythonFile: String): String = {
+
+    val formattedPythonFile = PythonRunner.formatPath(pythonFile)
+    execFile(formattedPythonFile, writer, reader)
+
+  }
+
+  private def initPythonEnv(): (BufferedReader, BufferedWriter, Process) = {
+
+    val pythonExec =
+      sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))
+
+    // Format python filename paths before adding them to the PYTHONPATH
+    val formattedPyFiles = PythonRunner.formatPaths(pyPaths)
+
+    // Launch a Py4J gateway server for the process to connect to; this will let it see our
+    // Java system properties and such
+    val gatewayServer = new py4j.GatewayServer(ScalaPythonBridge, 0)
+    val thread = new Thread(new Runnable() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
+        gatewayServer.start()
+      }
+    })
+    thread.setName("py4j-gateway-init")
+    thread.setDaemon(true)
+    thread.start()
+
+    // Wait until the gateway server has started, so that we know which port is it bound to.
+    // `gatewayServer.start()` will start a new thread and run the server code there, after
+    // initializing the socket, so the thread started above will end as soon as the server is
+    // ready to serve connections.
+    thread.join()
+
+    // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
+    // python directories in SPARK_HOME (if set), and any files in the pyPaths argument
+    val pathElements = new ArrayBuffer[String]
+    pathElements ++= formattedPyFiles
+    pathElements += PythonUtils.sparkPythonPath
+    pathElements += sys.env.getOrElse("PYTHONPATH", "")
+    val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
+    logger.info(s"Running python with PYTHONPATH:\n\t${formattedPyFiles.mkString(",")}")
+
+    // Launch Python process
+    //val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
+    val builder = new ProcessBuilder((Seq(pythonExec, "-iu") ++ otherArgs).asJava)
+    val env = builder.environment()
+    env.put("PYTHONPATH", pythonPath)
+    // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
+    env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
+    env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
+    builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
+    //try {
+      val process = builder.start()
+      //new RedirectThread(process.getInputStream, System.out, "redirect output").start()
+      val writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream))
+      val reader = new BufferedReader(new InputStreamReader(process.getInputStream))
+
+      //      val exitCode = process.waitFor()
+      //      if (exitCode != 0) {
+      //        throw new SparkUserAppException(exitCode)
+      //      }
+    //} finally {
+      //      gatewayServer.shutdown()
+    //}
+
+    (reader, writer, process)
+  }
+
+
+  private def execFile(filename: String, writer: BufferedWriter, reader: BufferedReader): String = {
+    writer.write("import traceback\n")
+    writer.write("try:\n")
+    writer.write("    execfile('"+filename+"')\n")
+    writer.write("    print (\"*!?flush reader!?*\")\n")
+    writer.write("except Exception as e:\n")
+    writer.write("    traceback.print_exc()\n")
+    writer.write("    print (\"*!?flush error reader!?*\")\n\n")
+//    writer.write("    exit(1)\n\n")
+    writer.flush()
+    var output = ""
+    var line: String = reader.readLine
+    while (!line.contains("*!?flush reader!?*") && !line.contains("*!?flush error reader!?*")) {
+      //logger.debug("Read line from python shell : " + line)
+      System.out.println(line)
+      if (line == "...") {
+        //logger.warn("Syntax error ! ")
+        output += "Syntax error ! "
+      }
+      output += "\r" + line + "\n"
+      line = reader.readLine
+    }
+
+    if (line.contains("*!?flush error reader!?*"))
+      throw new RuntimeException("python bridge error: " + output)
+
+    output
+  }
+
+}
diff --git a/datafu-spark/src/test/resources/META-INF/services/datafu.spark.PythonResource b/datafu-spark/src/test/resources/META-INF/services/datafu.spark.PythonResource
new file mode 100644
index 0000000..1b6add8
--- /dev/null
+++ b/datafu-spark/src/test/resources/META-INF/services/datafu.spark.PythonResource
@@ -0,0 +1,3 @@
+datafu.spark.ExampleFiles
+datafu.spark.Py4JResource
+datafu.spark.PysparkResource
\ No newline at end of file
diff --git a/datafu-spark/src/test/resources/built_in_pyspark_lib/py4j-0.10.6-src.zip b/datafu-spark/src/test/resources/built_in_pyspark_lib/py4j-0.10.6-src.zip
new file mode 100644
index 0000000..2f8edcc
Binary files /dev/null and b/datafu-spark/src/test/resources/built_in_pyspark_lib/py4j-0.10.6-src.zip differ
diff --git a/datafu-spark/src/test/resources/built_in_pyspark_lib/pyspark.zip b/datafu-spark/src/test/resources/built_in_pyspark_lib/pyspark.zip
new file mode 100644
index 0000000..b6d1e63
Binary files /dev/null and b/datafu-spark/src/test/resources/built_in_pyspark_lib/pyspark.zip differ
diff --git a/datafu-spark/src/test/resources/example_tests/__init__.py b/datafu-spark/src/test/resources/example_tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/datafu-spark/src/test/resources/example_tests/df_utils_tests.py b/datafu-spark/src/test/resources/example_tests/df_utils_tests.py
new file mode 100644
index 0000000..304db02
--- /dev/null
+++ b/datafu-spark/src/test/resources/example_tests/df_utils_tests.py
@@ -0,0 +1,71 @@
+import os
+import sys
+from pprint import pprint as p
+
+from pyspark_utils.df_utils import PySparkDFUtils
+
+p('CHECKING IF PATHS EXISTS:')
+for x in sys.path:
+    p('PATH ' + x + ': ' + str(os.path.exists(x)))
+
+
+df_utils = PySparkDFUtils()
+
+df_people = sqlContext.createDataFrame([
+    ("a", "Alice", 34),
+    ("a", "Sara", 33),
+    ("b", "Bob", 36),
+    ("b", "Charlie", 30),
+    ("c", "David", 29),
+    ("c", "Esther", 32),
+    ("c", "Fanny", 36),
+    ("c", "Zoey", 36)],
+    ["id", "name", "age"])
+
+func_dedup_res = df_utils.dedup(dataFrame=df_people, groupCol=df_people.id,
+                             orderCols=[df_people.age.desc(), df_people.name.desc()])
+func_dedup_res.registerTempTable("dedup")
+
+func_dedupTopN_res = df_utils.dedupTopN(dataFrame=df_people, n=2, groupCol=df_people.id,
+                                     orderCols=[df_people.age.desc(), df_people.name.desc()])
+func_dedupTopN_res.registerTempTable("dedupTopN")
+
+func_dedup2_res = df_utils.dedup2(dataFrame=df_people, groupCol=df_people.id, orderByCol=df_people.age, desc=True,
+                               columnsFilter=["name"], columnsFilterKeep=False)
+func_dedup2_res.registerTempTable("dedup2")
+
+func_changeSchema_res = df_utils.changeSchema(dataFrame=df_people, newScheme=["id1", "name1", "age1"])
+func_changeSchema_res.registerTempTable("changeSchema")
+
+df_people2 = sqlContext.createDataFrame([
+    ("a", "Laura", 34),
+    ("a", "Stephani", 33),
+    ("b", "Margaret", 36)],
+    ["id", "name", "age"])
+
+simpleDF = sqlContext.createDataFrame([
+    ("a", "1")],
+    ["id", "value"])
+from pyspark.sql.functions import expr
+
+func_joinSkewed_res = df_utils.joinSkewed(dfLeft=df_people2.alias("df1"), dfRight=simpleDF.alias("df2"),
+                                       joinExprs=expr("df1.id == df2.id"), numShards=5,
+                                       joinType="inner")
+func_joinSkewed_res.registerTempTable("joinSkewed")
+
+func_broadcastJoinSkewed_res = df_utils.broadcastJoinSkewed(notSkewed=df_people2, skewed=simpleDF, joinCol="id",
+                                                         numberCustsToBroadcast=5)
+func_broadcastJoinSkewed_res.registerTempTable("broadcastJoinSkewed")
+
+dfRange = sqlContext.createDataFrame([
+    ("a", 34, 36)],
+    ["id1", "start", "end"])
+func_joinWithRange_res = df_utils.joinWithRange(dfSingle=df_people2, colSingle="age", dfRange=dfRange,
+                                             colRangeStart="start", colRangeEnd="end",
+                                             decreaseFactor=5)
+func_joinWithRange_res.registerTempTable("joinWithRange")
+
+func_joinWithRangeAndDedup_res = df_utils.joinWithRangeAndDedup(dfSingle=df_people2, colSingle="age", dfRange=dfRange,
+                                                  colRangeStart="start", colRangeEnd="end",
+                                                  decreaseFactor=5, dedupSmallRange=True)
+func_joinWithRangeAndDedup_res.registerTempTable("joinWithRangeAndDedup")
diff --git a/datafu-spark/src/test/resources/example_tests/pyfromscala.py b/datafu-spark/src/test/resources/example_tests/pyfromscala.py
new file mode 100644
index 0000000..2ba99cd
--- /dev/null
+++ b/datafu-spark/src/test/resources/example_tests/pyfromscala.py
@@ -0,0 +1,74 @@
+
+# print the PYTHONPATH
+import sys
+from pprint import pprint as p
+p(sys.path)
+
+from pyspark.sql import functions as F
+
+
+import os
+print os.getcwd()
+
+
+###############################################################
+# query scala defined DF
+###############################################################
+dfout = sqlContext.sql("select num * 2 as d from dfin")
+dfout.registerTempTable("dfout")
+dfout.groupBy(dfout['d']).count().show()
+sqlContext.sql("select count(*) as cnt from dfout").show()
+dfout.groupBy(dfout['d']).agg(F.count(F.col('d')).alias('cnt')).show()
+
+sqlContext.sql("select d * 4 as d from dfout").registerTempTable("dfout2")
+
+
+###############################################################
+# check python UDFs
+###############################################################
+
+def magic_func(s):
+
+    return s + " magic"
+
+sqlContext.udf.register("magic", magic_func)
+
+
+###############################################################
+# check sc.textFile
+###############################################################
+
+DEL = '\x10'
+
+from pyspark.sql.types import StructType, StructField
+from pyspark.sql.types import StringType
+
+schema = StructType([
+    StructField("A", StringType()),
+    StructField("B", StringType())
+])
+
+txt_df = sqlContext.read.csv('src/test/resources/text.csv', sep=DEL, schema=schema)
+
+print type(txt_df)
+print dir(txt_df)
+print txt_df.count()
+
+txt_df.show()
+
+txt_df2 = sc.textFile('src/test/resources/text.csv').map(lambda x: x.split(DEL)).toDF()
+txt_df2.show()
+
+
+###############################################################
+# convert python dict to DataFrame
+###############################################################
+
+d = {'a': 0.1, 'b': 2}
+d = [(k,1.0*d[k]) for k in d]
+stats_df = sc.parallelize(d, 1).toDF(["name", "val"])
+stats_df.registerTempTable('stats')
+
+sqlContext.table("stats").show()
+
+
diff --git a/datafu-spark/src/test/resources/example_tests/pyfromscala_with_error.py b/datafu-spark/src/test/resources/example_tests/pyfromscala_with_error.py
new file mode 100644
index 0000000..a2ba4e5
--- /dev/null
+++ b/datafu-spark/src/test/resources/example_tests/pyfromscala_with_error.py
@@ -0,0 +1,3 @@
+
+
+sqlContext.sql("select * from edw.table_not_exists")
diff --git a/datafu-spark/src/test/resources/log4j.properties b/datafu-spark/src/test/resources/log4j.properties
new file mode 100644
index 0000000..7422151
--- /dev/null
+++ b/datafu-spark/src/test/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootCategory=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d %5p %c{3} - %m%n
diff --git a/datafu-spark/src/test/resources/text.csv b/datafu-spark/src/test/resources/text.csv
new file mode 100644
index 0000000..c0d981d
--- /dev/null
+++ b/datafu-spark/src/test/resources/text.csv
@@ -0,0 +1,5 @@
+14
+52
+47
+38
+03
\ No newline at end of file
diff --git a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
new file mode 100644
index 0000000..a68ee76
--- /dev/null
+++ b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.spark
+
+class PysparkResource extends PythonResource(PathsResolver.pyspark, true)
+
+class Py4JResource extends PythonResource(PathsResolver.py4j, true)
+
+object PathsResolver {
+  val pyspark = ResourceCloning.cloneResource(getClass.getResource("/built_in_pyspark_lib/pyspark.zip"), "pyspark_cloned.zip").getPath
+  val py4j = ResourceCloning.cloneResource(getClass.getResource("/built_in_pyspark_lib/py4j-0.10.6-src.zip"), "py4j_cloned.zip").getPath
+}
\ No newline at end of file
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
new file mode 100644
index 0000000..f3bea97
--- /dev/null
+++ b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.spark
+
+import java.io.File
+
+import com.holdenkarau.spark.testing.Utils
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.junit._
+import org.scalatest.FunSuite
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import scala.util.Try
+
+object TestScalaPythonBridge {
+
+  def getNewRunner(): ScalaPythonBridgeRunner = {
+    val runner = ScalaPythonBridgeRunner()
+    runner.runPythonFile("pyspark_utils/init_spark_context.py")
+    runner
+  }
+
+  def getNewSparkSession(): SparkSession = {
+
+    val tempDir = Utils.createTempDir()
+    val localMetastorePath = new File(tempDir, "metastore").getCanonicalPath
+    val localWarehousePath = new File(tempDir, "wharehouse").getCanonicalPath
+    val pythonPath = PythonPathsManager.getAbsolutePaths().mkString(File.pathSeparator)
+    println("Creating SparkConf with PYTHONPATH: " + pythonPath)
+    val sparkConf = new SparkConf()
+    .setMaster("local[1]")
+    .set("spark.sql.warehouse.dir", localWarehousePath)
+    .set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastorePath;create=true")
+    //.set("datanucleus.rdbms.datastoreAdapterClassName", "org.datanucleus.store.rdbms.adapter.DerbyAdapter")
+    .setExecutorEnv(Seq(("PYTHONPATH", pythonPath)))
+    .setAppName("Spark Unit Test")
+
+    val builder = SparkSession.builder().config(sparkConf).enableHiveSupport()
+    val spark = builder.getOrCreate()
+
+    spark
+  }
+}
+
+@RunWith(classOf[JUnitRunner])
+class TestScalaPythonBridge extends FunSuite {
+
+  private val spark = TestScalaPythonBridge.getNewSparkSession
+  private lazy val runner = TestScalaPythonBridge.getNewRunner()
+
+  def assertTable(tableName: String, expected: String): Unit =
+    Assert.assertEquals(expected, spark.table(tableName).collect().sortBy(_.toString).mkString(", "))
+
+
+  test("pyfromscala.py") {
+
+    import spark.implicits._
+
+    val dfin = spark.sparkContext.parallelize(1 to 10).toDF("num")
+    dfin.createOrReplaceTempView("dfin")
+
+    runner.runPythonFile("example_tests/pyfromscala.py")
+
+    // try to invoke python udf from scala code
+    assert(spark.sql("select magic('python_udf')").collect().mkString(",") == "[python_udf magic]")
+
+    //spark.sql("select * from dfout").registerTempTable("output")
+    assertTable("dfout", "[10], [12], [14], [16], [18], [20], [2], [4], [6], [8]")
+    assertTable("dfout2", "[16], [24], [32], [40], [48], [56], [64], [72], [80], [8]")
+    assertTable("stats", "[a,0.1], [b,2.0]")
+  }
+
+  test("pyfromscala_with_error.py") {
+    val t = Try(runner.runPythonFile("example_tests/pyfromscala_with_error.py"))
+    assert(t.isFailure)
+    assert(t.failed.get.isInstanceOf[RuntimeException])
+  }
+  
+  test("SparkDFUtilsBridge") {
+    runner.runPythonFile("example_tests/df_utils_tests.py")
+    assertTable("dedup", "[a,Alice,34], [b,Bob,36], [c,Zoey,36]")
+    assertTable("dedupTopN", "[a,Alice,34], [a,Sara,33], [b,Bob,36], [b,Charlie,30], [c,Fanny,36], [c,Zoey,36]")
+    assertTable("dedup2", "[a,34], [b,36], [c,36]")
+    assertTable("changeSchema", "[a,Alice,34], [a,Sara,33], [b,Bob,36], [b,Charlie,30], [c,David,29], [c,Esther,32], [c,Fanny,36], [c,Zoey,36]")
+    assertTable("joinSkewed", "[a,Laura,34,a,1], [a,Stephani,33,a,1]")
+    assertTable("broadcastJoinSkewed", "[a,Laura,34,1], [a,Stephani,33,1]")
+    assertTable("joinWithRange", "[a,Laura,34,a,34,36], [b,Margaret,36,a,34,36]")
+    assertTable("joinWithRangeAndDedup", "[a,Laura,34,a,34,36], [b,Margaret,36,a,34,36]")
+  }
+
+}
+
+class ExampleFiles extends PythonResource("example_tests")
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
index 861a1c0..d6fa0f6 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -126,23 +126,47 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
        assertDataFrameEquals(expected, actual)
     }
 
-   	case class expRangeJoin(col_grp:String, col_ord:Option[Int], col_str:String, start:Option[Int], end:Option[Int], desc:String)
-  
+   	case class expRangeJoin2(col_grp:String, col_ord:Option[Int], col_str:String, start:Option[Int], end:Option[Int], desc:String)
+   	case class expRangeJoin1(col_grp:String, col_ord:Int, col_str:String, start:Option[Int], end:Option[Int], desc:String)
+
     test("join_with_range") {
       val df = sc.parallelize(List(("a", 1, "asd1"), ("a", 2, "asd2"), ("a", 3, "asd3"), ("b", 1, "asd4"))).toDF("col_grp", "col_ord", "col_str")
    	  val dfr = sc.parallelize(List((1, 2,"asd1"), (1, 4, "asd2"), (3, 5,"asd3"), (3, 10,"asd4"))).toDF("start", "end", "desc")
   
       val expected = sqlContext.createDataFrame(List(
-          expRangeJoin("b",Option(1),"asd4",Option(1),Option(2),"asd1"),
-          expRangeJoin("a",Option(3),"asd3",Option(3),Option(5),"asd3"),
-          expRangeJoin("a",Option(2),"asd2",Option(1),Option(2),"asd1")
-
+        expRangeJoin1("a",1,"asd1",Option(1),Option(2),"asd1"),
+        expRangeJoin1("a",1,"asd1",Option(1),Option(4),"asd2"),
+        expRangeJoin1("a",2,"asd2",Option(1),Option(2),"asd1"),
+        expRangeJoin1("a",2,"asd2",Option(1),Option(4),"asd2"),
+        expRangeJoin1("a",3,"asd3",Option(1),Option(4),"asd2"),
+        expRangeJoin1("a",3,"asd3",Option(3),Option(5),"asd3"),
+        expRangeJoin1("a",3,"asd3",Option(3),Option(10),"asd4"),
+        expRangeJoin1("b",1,"asd4",Option(1),Option(2),"asd1"),
+        expRangeJoin1("b",1,"asd4",Option(1),Option(4),"asd2")
       ))
       
       val actual = df.joinWithRange("col_ord", dfr, "start", "end")
+      actual.show()
       
       assertDataFrameEquals(expected, actual)
   }
+
+  test("join_with_range_and_dedup") {
+      val df = sc.parallelize(List(("a", 1, "asd1"), ("a", 2, "asd2"), ("a", 3, "asd3"), ("b", 1, "asd4"))).toDF("col_grp", "col_ord", "col_str")
+   	  val dfr = sc.parallelize(List((1, 2,"asd1"), (1, 4, "asd2"), (3, 5,"asd3"), (3, 10,"asd4"))).toDF("start", "end", "desc")
+
+      val expected = sqlContext.createDataFrame(List(
+          expRangeJoin2("b",Option(1),"asd4",Option(1),Option(2),"asd1"),
+          expRangeJoin2("a",Option(3),"asd3",Option(3),Option(5),"asd3"),
+          expRangeJoin2("a",Option(2),"asd2",Option(1),Option(2),"asd1")
+
+      ))
+
+      val actual = df.joinWithRangeAndDedup("col_ord", dfr, "start", "end")
+      actual.show()
+
+      assertDataFrameEquals(expected, actual)
+  }
   
     test("broadcastJoinSkewed") {
       val skewedList = List(("1", "a"), ("1", "b"), ("1", "c"), ("1", "d"), ("1", "e"),("2", "k"),("0", "k"))
diff --git a/gradle.properties b/gradle.properties
index 10ce0cf..05d1749 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,8 +16,9 @@
 # under the License.
 
 group=org.apache.datafu
-version=1.4.0
+version=1.5.0
 gradleVersion=4.8.1
 org.gradle.jvmargs="-XX:MaxPermSize=512m"
 scalaVersion=2.11
+sparkVersion=2.3.0
 release=false