You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by chenghao-intel <gi...@git.apache.org> on 2014/12/09 04:55:33 UTC

[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

GitHub user chenghao-intel opened a pull request:

    https://github.com/apache/spark/pull/3640

    [SPARK-4785] [SQL] Support udf instance ser/de after initialization

    UDF contract change in Hive 0.13.1. In Hive 0.12.0, it's always safe to construct and initialize a fresh UDF object on worker side, while in Hive 0.13.1, UDF objects should only be initialized on driver side and then serialized to the worker side. We provide the ability to serialize the initialized UDF instance and deserialize them cross process boundary.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/chenghao-intel/spark udf_serde

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/3640.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3640
    
----
commit 19cbd463c60fcf408b003fa53f3a1c7d8c7cbac3
Author: Cheng Hao <ha...@intel.com>
Date:   2014-12-09T03:46:05Z

    support udf instance ser/de after initialization

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66295827
  
      [Test build #24247 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24247/consoleFull) for   PR 3640 at commit [`74466a3`](https://github.com/apache/spark/commit/74466a3517996d837ba9f8acb6b912a82bbbd6df).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class HiveFunctionWrapper(var functionClassName: String) extends java.io.Serializable `
      * `class HiveFunctionWrapper(var functionClassName: String) extends java.io.Externalizable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66234309
  
      [Test build #24238 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24238/consoleFull) for   PR 3640 at commit [`e9c3212`](https://github.com/apache/spark/commit/e9c32129cdeb14b424bf9c19445efba9378fc2ba).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66239947
  
      [Test build #24240 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24240/consoleFull) for   PR 3640 at commit [`396c0e1`](https://github.com/apache/spark/commit/396c0e1bf10d4ca69675801aa68bf9b21ba5c9bf).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66295835
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24247/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66233507
  
      [Test build #24237 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24237/consoleFull) for   PR 3640 at commit [`19cbd46`](https://github.com/apache/spark/commit/19cbd463c60fcf408b003fa53f3a1c7d8c7cbac3).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66234312
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24238/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66233511
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24237/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66319557
  
    The current design of this PR is derived from some background knowledges. I'd like to provide a brief summary here for future reference.
    
    As mentioned in the PR description, different from Hive 0.12.0, in Hive 0.13.1 UDF/UDAF/UDTF (aka Hive function) objects should only be initialized once on the driver side and then serialized to executors. However, not all function objects are serializable (e.g. `GenericUDF` doesn't implement `Serializable`). Hive 0.13.1 solves this issue with Kryo or XML serializer. Several utility ser/de methods are provided in class `o.a.h.h.q.e.Utilities` for this purpose. In this PR we chose Kryo for efficiency. The Kryo serializer used here is created in Hive. Spark Kryo serializer wasn't used because there's no available `SparkConf` instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512252
  
    --- Diff: sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala ---
    @@ -49,6 +49,16 @@ import org.apache.spark.sql.catalyst.types.DecimalType
     private[hive] object HiveShim {
       val version = "0.12.0"
     
    +  import org.apache.hadoop.hive.ql.exec.Utilities
    +
    +  def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[UDFType]): UDFType = {
    +    Utilities.deserializePlan(is).asInstanceOf[UDFType]
    +  }
    +
    +  def serializePlan(function: Any, out: java.io.OutputStream): Unit = {
    +    Utilities.serializePlan(function, out)
    +  }
    +
    --- End diff --
    
    Instead of calling `Utilities.deserializePlan`, how about mimic `Utilities.de/serializeObjectByKryo` methods here? Those two functions are private but very simple, the advantage is that we don't need the expensive `HiveConf` instantiation here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66244351
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24240/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66297896
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24248/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/3640


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66244346
  
      [Test build #24240 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24240/consoleFull) for   PR 3640 at commit [`396c0e1`](https://github.com/apache/spark/commit/396c0e1bf10d4ca69675801aa68bf9b21ba5c9bf).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512696
  
    --- Diff: sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala ---
    @@ -48,6 +48,17 @@ import scala.language.implicitConversions
     private[hive] object HiveShim {
       val version = "0.13.1"
     
    +  import org.apache.hadoop.hive.ql.exec.Utilities
    +  import org.apache.hadoop.hive.conf.HiveConf
    +
    +  def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[UDFType]): UDFType = {
    +    Utilities.deserializePlan(is, clazz, new HiveConf())
    +  }
    +
    +  def serializePlan(function: Any, out: java.io.OutputStream): Unit = {
    +    Utilities.serializePlan(function, out, new HiveConf())
    +  }
    +
    --- End diff --
    
    Yes, initializing the HiveConf is quite expensive, particularly within the executor, I will fix it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512701
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -54,47 +54,95 @@ private[hive] abstract class HiveFunctionRegistry
         val functionClassName = functionInfo.getFunctionClass.getName
     
         if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveSimpleUdf(functionClassName, children)
    +      HiveSimpleUdf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdf(functionClassName, children)
    +      HiveGenericUdf(new HiveFunctionCache(functionClassName), children)
         } else if (
              classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdaf(functionClassName, children)
    +      HiveGenericUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveUdaf(functionClassName, children)
    +      HiveUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdtf(functionClassName, Nil, children)
    +      HiveGenericUdtf(new HiveFunctionCache(functionClassName), Nil, children)
         } else {
           sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
         }
       }
     }
     
    -private[hive] trait HiveFunctionFactory {
    -  val functionClassName: String
    -
    -  def createFunction[UDFType]() =
    -    getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    -}
    -
    -private[hive] abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
    -  self: Product =>
    +/**
    + * This class provides the UDF creation and also the UDF instance serialization and
    + * de-serialization cross process boundary.
    + *
    + * We use class instead of trait, seems property variables of trait cannot be serialized when
    + * bundled with Case Class; in the other hand, we need to intercept the UDF instance ser/de.
    + * the "Has-a" probably better than "Is-a".
    + * @param functionClassName UDF class name
    + */
    +class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable {
    --- End diff --
    
    Another comment, related to `HiveShim`. I was thinking to move this class rather than `de/serializePlan` methods into the shim layer. Hive 0.12.0 is not affected by SPARK-4785, thus the version in 0.12.0 shim can be very simple. We only need to handle 0.13.1 there. This also lowers the possibility of breaking 0.12.0 code paths.
    
    Also after moving this class into the shim layer, as I've mentioned in another comment, instead of relying on `de/serializePlan`, we can just mimic `Utilities.de/serializeObjectByKryo` in `read/writeExternal` methods in this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66251380
  
    Appreciate a lot for fixing this case! The serialization wrapper class makes sense. However, would like to make some refactoring. A summary of my comments above:
    
    1. Renaming `HiveFunctionCache` to `HiveFunctionWrapper`
    2. Moving `HiveFunctionWrapper` to the shim layer, and keep Hive 0.12.0 code path exactly the same as before.
    
       Considering Hive 0.12.0 is not affected by this issue, and 1.2.0 release is really close, I'd like to lower the possibility of breaking 0.12.0 code paths as much as possible. 
    
    3. Add a `HiveSimpleUdfWrapper`, which inherits from `HiveFunctionWrapper`.
    
       As you've mentioned in the code, `HiveSimpleUdf` is a special case, it shouldn't be serialized, and should always create a fresh object on executor side. Currently this special case complicates `HiveFunctionWrapper` implementation and makes it somewhat confusing. Defining a special subclass for `HvieSimpleUdf` helps making `HiveFunctionWrapper` simpler (e.g. no need to serialize the boolean flag any more).
    
    4. Avoid using `Utilities.de/serializePlan` by mimicking `Utilities.de/serializeObjectByKryo` in the `read/writeExternal` methods of the wrapper class, so that we can remove the expensive `HiveConf` instantiation.
    
    In a word, we can add two classes, `HiveFunctionWrapper` and `HiveSImpleUdfWrapper` into the shim layer, and make sure that the 0.12.0 version behaves exactly the same as before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66332833
  
    Thanks a lot guys for digging into this!  Merged to master and branch 1.2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66231529
  
      [Test build #24237 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24237/consoleFull) for   PR 3640 at commit [`19cbd46`](https://github.com/apache/spark/commit/19cbd463c60fcf408b003fa53f3a1c7d8c7cbac3).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512453
  
    --- Diff: sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala ---
    @@ -48,6 +48,17 @@ import scala.language.implicitConversions
     private[hive] object HiveShim {
       val version = "0.13.1"
     
    +  import org.apache.hadoop.hive.ql.exec.Utilities
    +  import org.apache.hadoop.hive.conf.HiveConf
    +
    +  def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[UDFType]): UDFType = {
    +    Utilities.deserializePlan(is, clazz, new HiveConf())
    +  }
    +
    +  def serializePlan(function: Any, out: java.io.OutputStream): Unit = {
    +    Utilities.serializePlan(function, out, new HiveConf())
    +  }
    +
    --- End diff --
    
    Instead of calling `Utilities.deserializePlan`, how about mimic `Utilities.de/serializeObjectByKryo` methods here? Those two functions are private but very simple, the advantage is that we don't need the expensive `HiveConf` instantiation here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66232292
  
      [Test build #24238 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24238/consoleFull) for   PR 3640 at commit [`e9c3212`](https://github.com/apache/spark/commit/e9c32129cdeb14b424bf9c19445efba9378fc2ba).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512868
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -54,47 +54,95 @@ private[hive] abstract class HiveFunctionRegistry
         val functionClassName = functionInfo.getFunctionClass.getName
     
         if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveSimpleUdf(functionClassName, children)
    +      HiveSimpleUdf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdf(functionClassName, children)
    +      HiveGenericUdf(new HiveFunctionCache(functionClassName), children)
         } else if (
              classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdaf(functionClassName, children)
    +      HiveGenericUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveUdaf(functionClassName, children)
    +      HiveUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdtf(functionClassName, Nil, children)
    +      HiveGenericUdtf(new HiveFunctionCache(functionClassName), Nil, children)
         } else {
           sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
         }
       }
     }
     
    -private[hive] trait HiveFunctionFactory {
    -  val functionClassName: String
    -
    -  def createFunction[UDFType]() =
    -    getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    -}
    -
    -private[hive] abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
    -  self: Product =>
    +/**
    + * This class provides the UDF creation and also the UDF instance serialization and
    + * de-serialization cross process boundary.
    + *
    + * We use class instead of trait, seems property variables of trait cannot be serialized when
    + * bundled with Case Class; in the other hand, we need to intercept the UDF instance ser/de.
    + * the "Has-a" probably better than "Is-a".
    + * @param functionClassName UDF class name
    + */
    +class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable {
    +  // for Serialization
    +  def this() = this(null)
    +
    +  private var instance: Any = null
    +
    +  def writeExternal(out: java.io.ObjectOutput) {
    +    // output the function name
    +    out.writeUTF(functionClassName)
    +
    +    // Write a flag if instance is null or not
    +    out.writeBoolean(instance != null)
    +    if (instance != null) {
    +      // Some of the UDF are serializable, but some others are not
    +      // Hive Utilities can handle both cases
    +      val baos = new java.io.ByteArrayOutputStream()
    +      HiveShim.serializePlan(instance, baos)
    +      val functionInBytes = baos.toByteArray
    +
    +      // output the function bytes
    +      out.writeInt(functionInBytes.length)
    +      out.write(functionInBytes, 0, functionInBytes.length)
    +    }
    +  }
     
    -  type UDFType
    -  type EvaluatedType = Any
    +  def readExternal(in: java.io.ObjectInput) {
    +    // read the function name
    +    functionClassName = in.readUTF()
     
    -  def nullable = true
    +    if (in.readBoolean()) {
    +      // if the instance is not null
    +      // read the function in bytes
    +      val functionInBytesLength = in.readInt()
    +      val functionInBytes = new Array[Byte](functionInBytesLength)
    +      in.read(functionInBytes, 0, functionInBytesLength)
     
    -  lazy val function = createFunction[UDFType]()
    +      // deserialize the function object via Hive Utilities
    +      instance = HiveShim.deserializePlan(new java.io.ByteArrayInputStream(functionInBytes),
    +        getContextOrSparkClassLoader.loadClass(functionClassName))
    +    }
    +  }
     
    -  override def toString = s"$nodeName#$functionClassName(${children.mkString(",")})"
    +  def createFunction[UDFType](alwaysCreateNewInstance: Boolean = false) = {
    +    if (alwaysCreateNewInstance) {
    +      getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    +    } else {
    +      if (instance == null) {
    +        instance = getContextOrSparkClassLoader.loadClass(functionClassName).newInstance
    +      }
    +      instance.asInstanceOf[UDFType]
    +    }
    --- End diff --
    
    Can be simplified to:
    
    ```scala
    if (!alwaysCreateNewInstance && instance == null) {
      instance = ...
    }
    instance.asInstanceOf[UDFType]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21511900
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -54,47 +54,95 @@ private[hive] abstract class HiveFunctionRegistry
         val functionClassName = functionInfo.getFunctionClass.getName
     
         if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveSimpleUdf(functionClassName, children)
    +      HiveSimpleUdf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdf(functionClassName, children)
    +      HiveGenericUdf(new HiveFunctionCache(functionClassName), children)
         } else if (
              classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdaf(functionClassName, children)
    +      HiveGenericUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveUdaf(functionClassName, children)
    +      HiveUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdtf(functionClassName, Nil, children)
    +      HiveGenericUdtf(new HiveFunctionCache(functionClassName), Nil, children)
         } else {
           sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
         }
       }
     }
     
    -private[hive] trait HiveFunctionFactory {
    -  val functionClassName: String
    -
    -  def createFunction[UDFType]() =
    -    getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    -}
    -
    -private[hive] abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
    -  self: Product =>
    +/**
    + * This class provides the UDF creation and also the UDF instance serialization and
    + * de-serialization cross process boundary.
    + *
    + * We use class instead of trait, seems property variables of trait cannot be serialized when
    + * bundled with Case Class; in the other hand, we need to intercept the UDF instance ser/de.
    + * the "Has-a" probably better than "Is-a".
    + * @param functionClassName UDF class name
    + */
    +class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable {
    --- End diff --
    
    `HiveFunctionWrapper` might be a better name, since essentially this class is just used to make Hive function objects serializable. And traditionally Spark uses "wrapper" as suffix for classes with similar purposes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66287710
  
      [Test build #24248 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24248/consoleFull) for   PR 3640 at commit [`8e13756`](https://github.com/apache/spark/commit/8e137565e34e36361faf90670ab8e98743051410).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512352
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -54,47 +54,95 @@ private[hive] abstract class HiveFunctionRegistry
         val functionClassName = functionInfo.getFunctionClass.getName
     
         if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveSimpleUdf(functionClassName, children)
    +      HiveSimpleUdf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdf(functionClassName, children)
    +      HiveGenericUdf(new HiveFunctionCache(functionClassName), children)
         } else if (
              classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdaf(functionClassName, children)
    +      HiveGenericUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveUdaf(functionClassName, children)
    +      HiveUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdtf(functionClassName, Nil, children)
    +      HiveGenericUdtf(new HiveFunctionCache(functionClassName), Nil, children)
         } else {
           sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
         }
       }
     }
     
    -private[hive] trait HiveFunctionFactory {
    -  val functionClassName: String
    -
    -  def createFunction[UDFType]() =
    -    getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    -}
    -
    -private[hive] abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
    -  self: Product =>
    +/**
    + * This class provides the UDF creation and also the UDF instance serialization and
    + * de-serialization cross process boundary.
    + *
    + * We use class instead of trait, seems property variables of trait cannot be serialized when
    + * bundled with Case Class; in the other hand, we need to intercept the UDF instance ser/de.
    + * the "Has-a" probably better than "Is-a".
    + * @param functionClassName UDF class name
    + */
    +class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable {
    +  // for Serialization
    +  def this() = this(null)
    +
    +  private var instance: Any = null
    +
    +  def writeExternal(out: java.io.ObjectOutput) {
    +    // output the function name
    +    out.writeUTF(functionClassName)
    +
    +    // Write a flag if instance is null or not
    +    out.writeBoolean(instance != null)
    --- End diff --
    
    Does this relate to the `HiveSimpleUdf` and UDF bridge case you mentioned offline? Would be better to leave a comment here. I was confused when reading this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66286027
  
      [Test build #24247 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24247/consoleFull) for   PR 3640 at commit [`74466a3`](https://github.com/apache/spark/commit/74466a3517996d837ba9f8acb6b912a82bbbd6df).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512172
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -54,47 +54,95 @@ private[hive] abstract class HiveFunctionRegistry
         val functionClassName = functionInfo.getFunctionClass.getName
     
         if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveSimpleUdf(functionClassName, children)
    +      HiveSimpleUdf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdf(functionClassName, children)
    +      HiveGenericUdf(new HiveFunctionCache(functionClassName), children)
         } else if (
              classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdaf(functionClassName, children)
    +      HiveGenericUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveUdaf(functionClassName, children)
    +      HiveUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdtf(functionClassName, Nil, children)
    +      HiveGenericUdtf(new HiveFunctionCache(functionClassName), Nil, children)
         } else {
           sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
         }
       }
     }
     
    -private[hive] trait HiveFunctionFactory {
    -  val functionClassName: String
    -
    -  def createFunction[UDFType]() =
    -    getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    -}
    -
    -private[hive] abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
    -  self: Product =>
    +/**
    + * This class provides the UDF creation and also the UDF instance serialization and
    + * de-serialization cross process boundary.
    + *
    + * We use class instead of trait, seems property variables of trait cannot be serialized when
    + * bundled with Case Class; in the other hand, we need to intercept the UDF instance ser/de.
    + * the "Has-a" probably better than "Is-a".
    + * @param functionClassName UDF class name
    + */
    +class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable {
    --- End diff --
    
    Yes, that's a good suggestion, will update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3640#issuecomment-66297888
  
      [Test build #24248 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24248/consoleFull) for   PR 3640 at commit [`8e13756`](https://github.com/apache/spark/commit/8e137565e34e36361faf90670ab8e98743051410).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class HiveFunctionWrapper(var functionClassName: String) extends java.io.Serializable `
      * `class HiveFunctionWrapper(var functionClassName: String) extends java.io.Externalizable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512634
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -54,47 +54,95 @@ private[hive] abstract class HiveFunctionRegistry
         val functionClassName = functionInfo.getFunctionClass.getName
     
         if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveSimpleUdf(functionClassName, children)
    +      HiveSimpleUdf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdf(functionClassName, children)
    +      HiveGenericUdf(new HiveFunctionCache(functionClassName), children)
         } else if (
              classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdaf(functionClassName, children)
    +      HiveGenericUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveUdaf(functionClassName, children)
    +      HiveUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdtf(functionClassName, Nil, children)
    +      HiveGenericUdtf(new HiveFunctionCache(functionClassName), Nil, children)
         } else {
           sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
         }
       }
     }
     
    -private[hive] trait HiveFunctionFactory {
    -  val functionClassName: String
    -
    -  def createFunction[UDFType]() =
    -    getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    -}
    -
    -private[hive] abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
    -  self: Product =>
    +/**
    + * This class provides the UDF creation and also the UDF instance serialization and
    + * de-serialization cross process boundary.
    + *
    + * We use class instead of trait, seems property variables of trait cannot be serialized when
    + * bundled with Case Class; in the other hand, we need to intercept the UDF instance ser/de.
    + * the "Has-a" probably better than "Is-a".
    + * @param functionClassName UDF class name
    + */
    +class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable {
    +  // for Serialization
    +  def this() = this(null)
    +
    +  private var instance: Any = null
    +
    +  def writeExternal(out: java.io.ObjectOutput) {
    +    // output the function name
    +    out.writeUTF(functionClassName)
    +
    +    // Write a flag if instance is null or not
    +    out.writeBoolean(instance != null)
    --- End diff --
    
    Yes, actually I leave a comment in above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512962
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -54,47 +54,95 @@ private[hive] abstract class HiveFunctionRegistry
         val functionClassName = functionInfo.getFunctionClass.getName
     
         if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveSimpleUdf(functionClassName, children)
    +      HiveSimpleUdf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdf(functionClassName, children)
    +      HiveGenericUdf(new HiveFunctionCache(functionClassName), children)
         } else if (
              classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdaf(functionClassName, children)
    +      HiveGenericUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveUdaf(functionClassName, children)
    +      HiveUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdtf(functionClassName, Nil, children)
    +      HiveGenericUdtf(new HiveFunctionCache(functionClassName), Nil, children)
         } else {
           sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
         }
       }
     }
     
    -private[hive] trait HiveFunctionFactory {
    -  val functionClassName: String
    -
    -  def createFunction[UDFType]() =
    -    getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    -}
    -
    -private[hive] abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
    -  self: Product =>
    +/**
    + * This class provides the UDF creation and also the UDF instance serialization and
    + * de-serialization cross process boundary.
    + *
    + * We use class instead of trait, seems property variables of trait cannot be serialized when
    + * bundled with Case Class; in the other hand, we need to intercept the UDF instance ser/de.
    + * the "Has-a" probably better than "Is-a".
    + * @param functionClassName UDF class name
    + */
    +class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable {
    +  // for Serialization
    +  def this() = this(null)
    +
    +  private var instance: Any = null
    +
    +  def writeExternal(out: java.io.ObjectOutput) {
    +    // output the function name
    +    out.writeUTF(functionClassName)
    +
    +    // Write a flag if instance is null or not
    +    out.writeBoolean(instance != null)
    +    if (instance != null) {
    +      // Some of the UDF are serializable, but some others are not
    +      // Hive Utilities can handle both cases
    +      val baos = new java.io.ByteArrayOutputStream()
    +      HiveShim.serializePlan(instance, baos)
    +      val functionInBytes = baos.toByteArray
    +
    +      // output the function bytes
    +      out.writeInt(functionInBytes.length)
    +      out.write(functionInBytes, 0, functionInBytes.length)
    +    }
    +  }
     
    -  type UDFType
    -  type EvaluatedType = Any
    +  def readExternal(in: java.io.ObjectInput) {
    +    // read the function name
    +    functionClassName = in.readUTF()
     
    -  def nullable = true
    +    if (in.readBoolean()) {
    +      // if the instance is not null
    +      // read the function in bytes
    +      val functionInBytesLength = in.readInt()
    +      val functionInBytes = new Array[Byte](functionInBytesLength)
    +      in.read(functionInBytes, 0, functionInBytesLength)
     
    -  lazy val function = createFunction[UDFType]()
    +      // deserialize the function object via Hive Utilities
    +      instance = HiveShim.deserializePlan(new java.io.ByteArrayInputStream(functionInBytes),
    +        getContextOrSparkClassLoader.loadClass(functionClassName))
    +    }
    +  }
     
    -  override def toString = s"$nodeName#$functionClassName(${children.mkString(",")})"
    +  def createFunction[UDFType](alwaysCreateNewInstance: Boolean = false) = {
    +    if (alwaysCreateNewInstance) {
    +      getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    +    } else {
    +      if (instance == null) {
    +        instance = getContextOrSparkClassLoader.loadClass(functionClassName).newInstance
    +      }
    +      instance.asInstanceOf[UDFType]
    +    }
    --- End diff --
    
    Actually, how about removing the `alwaysCreateNewInstance` argument (which is confusing), and define a new `HiveSimpleUdfWrapper` that overrides `createFunction`, and always return a new instance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org