You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by ya...@apache.org on 2017/01/27 02:57:38 UTC

incubator-hivemall git commit: Close #29: [HIVEMALL-39][SPARK] Put the use of HiveUDFs in one place

Repository: incubator-hivemall
Updated Branches:
  refs/heads/master 70f42038a -> 88fae5208


Close #29: [HIVEMALL-39][SPARK] Put the use of HiveUDFs in one place


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/88fae520
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/88fae520
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/88fae520

Branch: refs/heads/master
Commit: 88fae52085f8e29d36cc038879b1d2a41f383a6c
Parents: 70f4203
Author: Takeshi YAMAMURO <li...@gmail.com>
Authored: Fri Jan 27 11:57:27 2017 +0900
Committer: Takeshi YAMAMURO <li...@gmail.com>
Committed: Fri Jan 27 11:57:27 2017 +0900

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HivemallOps.scala | 1109 +++++++++---------
 .../sql/hive/internal/HivemallOpsImpl.scala     |   78 ++
 .../spark/sql/hive/HivemallOpsSuite.scala       |   39 +-
 .../spark/sql/hive/ModelMixingSuite.scala       |   10 +-
 .../apache/spark/sql/hive/XGBoostSuite.scala    |    4 +-
 .../streaming/HivemallOpsWithFeatureSuite.scala |    2 +-
 6 files changed, 674 insertions(+), 568 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
index e3e20ee..6e588a9 100644
--- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
+++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{EachTopK, Expression, Literal, NamedExpression, UserDefinedGenerator}
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project}
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -54,607 +53,563 @@ import org.apache.spark.unsafe.types.UTF8String
  * @groupname misc
  */
 final class HivemallOps(df: DataFrame) extends Logging {
+  import internal.HivemallOpsImpl._
 
   private[this] val _sparkSession = df.sparkSession
   private[this] val _analyzer = _sparkSession.sessionState.analyzer
 
   /**
-   * @see hivemall.regression.AdaDeltaUDTF
+   * @see [[hivemall.regression.AdaDeltaUDTF]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_adadelta(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_adadelta",
-        new HiveFunctionWrapper("hivemall.regression.AdaDeltaUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.AdaDeltaUDTF",
+      "train_adadelta",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.regression.AdaGradUDTF
+   * @see [[hivemall.regression.AdaGradUDTF]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_adagrad(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_adagrad",
-        new HiveFunctionWrapper("hivemall.regression.AdaGradUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.AdaGradUDTF",
+      "train_adagrad",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.regression.AROWRegressionUDTF
+   * @see [[hivemall.regression.AROWRegressionUDTF]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_arow_regr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_arow_regr",
-        new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.AROWRegressionUDTF",
+      "train_arow_regr",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.regression.AROWRegressionUDTF$AROWe
+   * @see [[hivemall.regression.AROWRegressionUDTF.AROWe]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_arowe_regr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_arowe_regr",
-        new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.AROWRegressionUDTF$AROWe",
+      "train_arowe_regr",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.regression.AROWRegressionUDTF$AROWe2
+   * @see [[hivemall.regression.AROWRegressionUDTF.AROWe2]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_arowe2_regr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_arowe2_regr",
-        new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe2"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.AROWRegressionUDTF$AROWe2",
+      "train_arowe2_regr",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.regression.LogressUDTF
+   * @see [[hivemall.regression.LogressUDTF]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_logregr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_logregr",
-        new HiveFunctionWrapper("hivemall.regression.LogressUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.LogressUDTF",
+      "train_logregr",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.regression.PassiveAggressiveRegressionUDTF
+   * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_pa1_regr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_pa1_regr",
-        new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.PassiveAggressiveRegressionUDTF",
+      "train_pa1_regr",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a
+   * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_pa1a_regr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_pa1a_regr",
-        new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a",
+      "train_pa1a_regr",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2
+   * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_pa2_regr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_pa2_regr",
-        new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2",
+      "train_pa2_regr",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a
+   * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_pa2a_regr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_pa2a_regr",
-        new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a",
+      "train_pa2a_regr",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.smile.regression.RandomForestRegressionUDTF
+   * @see [[hivemall.smile.regression.RandomForestRegressionUDTF]]
    * @group regression
    */
   @scala.annotation.varargs
   def train_randomforest_regr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_randomforest_regr",
-        new HiveFunctionWrapper("hivemall.smile.regression.RandomForestRegressionUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
+    planHiveGenericUDTF(
+      df,
+      "hivemall.smile.regression.RandomForestRegressionUDTF",
+      "train_randomforest_regr",
+      setMixServs(toHivemallFeatures(exprs)),
       Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
-        .map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    )
   }
 
   /**
-   * @see hivemall.classifier.PerceptronUDTF
+   * @see [[hivemall.classifier.PerceptronUDTF]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_perceptron(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_perceptron",
-        new HiveFunctionWrapper("hivemall.classifier.PerceptronUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.PerceptronUDTF",
+      "train_perceptron",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.classifier.PassiveAggressiveUDTF
+   * @see [[hivemall.classifier.PassiveAggressiveUDTF]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_pa(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_pa",
-        new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.PassiveAggressiveUDTF",
+      "train_pa",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.classifier.PassiveAggressiveUDTF$PA1
+   * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA1]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_pa1(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_pa1",
-        new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA1"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.PassiveAggressiveUDTF$PA1",
+      "train_pa1",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.classifier.PassiveAggressiveUDTF$PA2
+   * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA2]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_pa2(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_pa2",
-        new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA2"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.PassiveAggressiveUDTF$PA2",
+      "train_pa2",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.classifier.ConfidenceWeightedUDTF
+   * @see [[hivemall.classifier.ConfidenceWeightedUDTF]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_cw(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_cw",
-        new HiveFunctionWrapper("hivemall.classifier.ConfidenceWeightedUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.ConfidenceWeightedUDTF",
+      "train_cw",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.classifier.AROWClassifierUDTF
+   * @see [[hivemall.classifier.AROWClassifierUDTF]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_arow(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_arow",
-        new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.AROWClassifierUDTF",
+      "train_arow",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.classifier.AROWClassifierUDTF$AROWh
+   * @see [[hivemall.classifier.AROWClassifierUDTF.AROWh]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_arowh(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_arowh",
-        new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF$AROWh"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.AROWClassifierUDTF$AROWh",
+      "train_arowh",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1
+   * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_scw(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_scw",
-        new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW1"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.SoftConfideceWeightedUDTF$SCW1",
+      "train_scw",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1
+   * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_scw2(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_scw2",
-        new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW2"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.SoftConfideceWeightedUDTF$SCW2",
+      "train_scw2",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.classifier.classifier.AdaGradRDAUDTF
+   * @see [[hivemall.classifier.AdaGradRDAUDTF]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_adagrad_rda(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-       "train_adagrad_rda",
-        new HiveFunctionWrapper("hivemall.classifier.AdaGradRDAUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.AdaGradRDAUDTF",
+      "train_adagrad_rda",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.smile.classification.RandomForestClassifierUDTF
+   * @see [[hivemall.smile.classification.RandomForestClassifierUDTF]]
    * @group classifier
    */
   @scala.annotation.varargs
   def train_randomforest_classifier(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-       "train_randomforest_classifier",
-        new HiveFunctionWrapper("hivemall.smile.classification.RandomForestClassifierUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
+    planHiveGenericUDTF(
+      df,
+      "hivemall.smile.classification.RandomForestClassifierUDTF",
+      "train_randomforest_classifier",
+      setMixServs(toHivemallFeatures(exprs)),
       Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
-        .map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    )
   }
 
   /**
-   * @see hivemall.classifier.classifier.MulticlassPerceptronUDTF
+   * @see [[hivemall.classifier.multiclass.MulticlassPerceptronUDTF]]
    * @group classifier.multiclass
    */
   @scala.annotation.varargs
   def train_multiclass_perceptron(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_multiclass_perceptron",
-        new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPerceptronUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.multiclass.MulticlassPerceptronUDTF",
+      "train_multiclass_perceptron",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("label", "feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.classifier.classifier.PassiveAggressiveUDTF
+   * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF]]
    * @group classifier.multiclass
    */
   @scala.annotation.varargs
   def train_multiclass_pa(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_multiclass_pa",
-        new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF",
+      "train_multiclass_pa",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("label", "feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA1
+   * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA1]]
    * @group classifier.multiclass
    */
   @scala.annotation.varargs
   def train_multiclass_pa1(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_multiclass_pa1",
-        new HiveFunctionWrapper(
-          "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1",
+      "train_multiclass_pa1",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("label", "feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA2
+   * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA2]]
    * @group classifier.multiclass
    */
   @scala.annotation.varargs
   def train_multiclass_pa2(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_multiclass_pa2",
-        new HiveFunctionWrapper(
-          "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2",
+      "train_multiclass_pa2",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("label", "feature", "weight")
+    )
   }
 
   /**
-   * @see hivemall.classifier.classifier.MulticlassConfidenceWeightedUDTF
+   * @see [[hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF]]
    * @group classifier.multiclass
    */
   @scala.annotation.varargs
   def train_multiclass_cw(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_multiclass_cw",
-        new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF",
+      "train_multiclass_cw",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("label", "feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.classifier.classifier.MulticlassAROWClassifierUDTF
+   * @see [[hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF]]
    * @group classifier.multiclass
    */
   @scala.annotation.varargs
   def train_multiclass_arow(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_multiclass_arow",
-        new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF",
+      "train_multiclass_arow",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("label", "feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW1
+   * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW1]]
    * @group classifier.multiclass
    */
   @scala.annotation.varargs
   def train_multiclass_scw(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_multiclass_scw",
-        new HiveFunctionWrapper(
-          "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1"),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1",
+      "train_multiclass_scw",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("label", "feature", "weight", "conv")
+    )
   }
 
   /**
-   * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW2
+   * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW2]]
    * @group classifier.multiclass
    */
   @scala.annotation.varargs
   def train_multiclass_scw2(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_multiclass_scw2",
-        new HiveFunctionWrapper(
-          "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2"
-        ),
-        setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2",
+      "train_multiclass_scw2",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("label", "feature", "weight", "conv")
+    )
   }
 
   /**
    * :: Experimental ::
-   * @see hivemall.xgboost.regression.XGBoostRegressionUDTF
+   * @see [[hivemall.xgboost.regression.XGBoostRegressionUDTF]]
    * @group xgboost
    */
   @Experimental
   @scala.annotation.varargs
   def train_xgboost_regr(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_xgboost_regr",
-        new HiveFunctionWrapper("hivemall.xgboost.regression.XGBoostRegressionUDTFWrapper"),
-        toHivemallFeatureDf(exprs : _*).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("model_id", "pred_model").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.xgboost.regression.XGBoostRegressionUDTFWrapper",
+      "train_xgboost_regr",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("model_id", "pred_model")
+    )
   }
 
   /**
    * :: Experimental ::
-   * @see hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF
+   * @see [[hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF]]
    * @group xgboost
    */
   @Experimental
   @scala.annotation.varargs
   def train_xgboost_classifier(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_xgboost_classifier",
-        new HiveFunctionWrapper(
-          "hivemall.xgboost.classification.XGBoostBinaryClassifierUDTFWrapper"),
-        toHivemallFeatureDf(exprs : _*).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("model_id", "pred_model").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.xgboost.classification.XGBoostBinaryClassifierUDTFWrapper",
+      "train_xgboost_classifier",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("model_id", "pred_model")
+    )
   }
 
   /**
    * :: Experimental ::
-   * @see hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF
+   * @see [[hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF]]
    * @group xgboost
    */
   @Experimental
   @scala.annotation.varargs
   def train_xgboost_multiclass_classifier(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "train_xgboost_multiclass_classifier",
-        new HiveFunctionWrapper(
-          "hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTFWrapper"
-        ),
-        toHivemallFeatureDf(exprs: _*).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("model_id", "pred_model").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTFWrapper",
+      "train_xgboost_multiclass_classifier",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("model_id", "pred_model")
+    )
   }
 
   /**
    * :: Experimental ::
-   * @see hivemall.xgboost.tools.XGBoostPredictUDTF
+   * @see [[hivemall.xgboost.tools.XGBoostPredictUDTF]]
    * @group xgboost
    */
   @Experimental
   @scala.annotation.varargs
   def xgboost_predict(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "xgboost_predict",
-        new HiveFunctionWrapper("hivemall.xgboost.tools.XGBoostPredictUDTF"),
-        toHivemallFeatureDf(exprs: _*).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("rowid", "predicted").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.xgboost.tools.XGBoostPredictUDTF",
+      "xgboost_predict",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("rowid", "predicted")
+    )
   }
 
   /**
    * :: Experimental ::
-   * @see hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF
+   * @see [[hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF]]
    * @group xgboost
    */
   @Experimental
   @scala.annotation.varargs
   def xgboost_multiclass_predict(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "xgboost_multiclass_predict",
-        new HiveFunctionWrapper("hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF"),
-        toHivemallFeatureDf(exprs: _*).map(_.expr)
-      ),
-      join = false, outer = false, None,
-      Seq("rowid", "label", "probability").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF",
+      "xgboost_multiclass_predict",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("rowid", "label", "probability")
+    )
   }
 
   /**
-   * @see hivemall.knn.lsh.MinHashUDTF
+   * @see [[hivemall.knn.lsh.MinHashUDTF]]
    * @group knn.lsh
    */
   @scala.annotation.varargs
   def minhash(exprs: Column*): DataFrame = withTypedPlan {
-     Generate(HiveGenericUDTF(
-        "minhash",
-        new HiveFunctionWrapper("hivemall.knn.lsh.MinHashUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      Seq("clusterid", "item").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.knn.lsh.MinHashUDTF",
+      "minhash",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("clusterid", "item")
+    )
   }
 
   /**
-   * @see hivemall.ftvec.amplify.AmplifierUDTF
+   * @see [[hivemall.ftvec.amplify.AmplifierUDTF]]
    * @group ftvec.amplify
    */
   @scala.annotation.varargs
@@ -663,17 +618,17 @@ final class HivemallOps(df: DataFrame) extends Logging {
       case Column(expr: NamedExpression) => UnresolvedAttribute(expr.name)
       case Column(expr: Expression) => UnresolvedAttribute(expr.simpleString)
     }
-    Generate(HiveGenericUDTF(
-        "amplify",
-        new HiveFunctionWrapper("hivemall.ftvec.amplify.AmplifierUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      outputAttr,
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.ftvec.amplify.AmplifierUDTF",
+      "amplify",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("clusterid", "item")
+    )
   }
 
   /**
-   * @see hivemall.ftvec.amplify.RandomAmplifierUDTF
+   * @see [[hivemall.ftvec.amplify.RandomAmplifierUDTF]]
    * @group ftvec.amplify
    */
   @scala.annotation.varargs
@@ -702,48 +657,48 @@ final class HivemallOps(df: DataFrame) extends Logging {
 
   /**
    * Quantifies input columns.
-   * @see hivemall.ftvec.conv.QuantifyColumnsUDTF
+   * @see [[hivemall.ftvec.conv.QuantifyColumnsUDTF]]
    * @group ftvec.conv
    */
   @scala.annotation.varargs
   def quantify(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "quantify",
-        new HiveFunctionWrapper("hivemall.ftvec.conv.QuantifyColumnsUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.ftvec.conv.QuantifyColumnsUDTF",
+      "quantify",
+      setMixServs(toHivemallFeatures(exprs)),
+      (0 until exprs.size - 1).map(i => s"c$i")
+    )
   }
 
   /**
-   * @see hivemall.ftvec.trans.BinarizeLabelUDTF
+   * @see [[hivemall.ftvec.trans.BinarizeLabelUDTF]]
    * @group ftvec.trans
    */
   @scala.annotation.varargs
   def binarize_label(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "binarize_label",
-        new HiveFunctionWrapper("hivemall.ftvec.trans.BinarizeLabelUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.ftvec.trans.BinarizeLabelUDTF",
+      "binarize_label",
+      setMixServs(toHivemallFeatures(exprs)),
+      (0 until exprs.size - 1).map(i => s"c$i")
+    )
   }
 
   /**
-   * @see hivemall.ftvec.trans.QuantifiedFeaturesUDTF
+   * @see [[hivemall.ftvec.trans.QuantifiedFeaturesUDTF]]
    * @group ftvec.trans
    */
   @scala.annotation.varargs
   def quantified_features(exprs: Column*): DataFrame = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "quantified_features",
-        new HiveFunctionWrapper("hivemall.ftvec.trans.QuantifiedFeaturesUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      Seq("features").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
+    planHiveGenericUDTF(
+      df,
+      "hivemall.ftvec.trans.QuantifiedFeaturesUDTF",
+      "quantified_features",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("features")
+    )
   }
 
   /**
@@ -758,7 +713,7 @@ final class HivemallOps(df: DataFrame) extends Logging {
   }
 
   /**
-   * Splits `org.apache.spark.ml.linalg.Vector` into pieces.
+   * Splits [[Vector]] into pieces.
    * @group ftvec
    */
   def explode_vector(expr: Column): DataFrame = {
@@ -811,37 +766,28 @@ final class HivemallOps(df: DataFrame) extends Logging {
   }
 
   /**
-   * Returns a new [[DataFrame]] with columns renamed.
-   * This is a wrapper for DataFrame#toDF.
+   * @see [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]]
    * @group misc
    */
   @scala.annotation.varargs
-  def as(colNames: String*): DataFrame = df.toDF(colNames: _*)
+  def lr_datagen(exprs: Column*): Dataset[Row] = withTypedPlan {
+    planHiveGenericUDTF(
+      df,
+      "hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper",
+      "lr_datagen",
+      setMixServs(toHivemallFeatures(exprs)),
+      Seq("label", "features")
+    )
+  }
 
   /**
    * Returns all the columns as Seq[Column] in this [[DataFrame]].
-   * @group misc
    */
-  def cols: Seq[Column] = {
+  private[sql] def cols: Seq[Column] = {
     df.schema.fields.map(col => df.col(col.name)).toSeq
   }
 
   /**
-   * @see hivemall.dataset.LogisticRegressionDataGeneratorUDTF
-   * @group misc
-   */
-  @scala.annotation.varargs
-  def lr_datagen(exprs: Column*): Dataset[Row] = withTypedPlan {
-    Generate(HiveGenericUDTF(
-        "lr_datagen",
-        new HiveFunctionWrapper("hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "features").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
    * :: Experimental ::
    * If a parameter '-mix' does not exist in a 3rd argument,
    * set it from an environmental variable
@@ -868,7 +814,10 @@ final class HivemallOps(df: DataFrame) extends Logging {
     }
   }
 
-  @inline private[this] def toHivemallFeatureDf(exprs: Column*): Seq[Column] = {
+  /**
+   * If the input is a [[Vector]], transform it into Hivemall features.
+   */
+  @inline private[this] def toHivemallFeatures(exprs: Seq[Column]): Seq[Column] = {
     df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map {
       case (StructField(_, _: VectorUDT, _, _), c) => HivemallUtils.to_hivemall_features(c)
       case (_, c) => c
@@ -886,6 +835,7 @@ final class HivemallOps(df: DataFrame) extends Logging {
 }
 
 object HivemallOps {
+  import internal.HivemallOpsImpl._
 
   /**
    * Implicitly inject the [[HivemallOps]] into [[DataFrame]].
@@ -894,417 +844,490 @@ object HivemallOps {
     new HivemallOps(df)
 
   /**
-   * @see hivemall.HivemallVersionUDF
+   * @see [[hivemall.HivemallVersionUDF]]
    * @group misc
    */
   def hivemall_version(): Column = withExpr {
-    HiveSimpleUDF("hivemall_version", new HiveFunctionWrapper("hivemall.HivemallVersionUDF"), Nil)
+    planHiveUDF(
+      "hivemall.HivemallVersionUDF",
+      "hivemall_version",
+      Nil
+    )
   }
 
   /**
-   * @see hivemall.knn.similarity.CosineSimilarityUDF
+   * @see [[hivemall.knn.similarity.CosineSimilarityUDF]]
    * @group knn.similarity
    */
   @scala.annotation.varargs
   def cosine_sim(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("cosine_sim",
-      new HiveFunctionWrapper("hivemall.knn.similarity.CosineSimilarityUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.similarity.CosineSimilarityUDF",
+      "cosine_sim",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.similarity.JaccardIndexUDF
+   * @see [[hivemall.knn.similarity.JaccardIndexUDF]]
    * @group knn.similarity
    */
   @scala.annotation.varargs
   def jaccard(exprs: Column*): Column = withExpr {
-    HiveSimpleUDF("jaccard",
-      new HiveFunctionWrapper("hivemall.knn.similarity.JaccardIndexUDF"),
-      exprs.map(_.expr))
+    planHiveUDF(
+      "hivemall.knn.similarity.JaccardIndexUDF",
+      "jaccard",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.similarity.AngularSimilarityUDF
+   * @see [[hivemall.knn.similarity.AngularSimilarityUDF]]
    * @group knn.similarity
    */
   @scala.annotation.varargs
   def angular_similarity(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("angular_similarity",
-      new HiveFunctionWrapper("hivemall.knn.similarity.AngularSimilarityUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.similarity.AngularSimilarityUDF",
+      "angular_similarity",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.similarity.EuclidSimilarity
+   * @see [[hivemall.knn.similarity.EuclidSimilarity]]
    * @group knn.similarity
    */
   @scala.annotation.varargs
   def euclid_similarity(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("euclid_similarity",
-      new HiveFunctionWrapper("hivemall.knn.similarity.EuclidSimilarity"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.similarity.EuclidSimilarity",
+      "euclid_similarity",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.similarity.Distance2SimilarityUDF
+   * @see [[hivemall.knn.similarity.Distance2SimilarityUDF]]
    * @group knn.similarity
    */
   @scala.annotation.varargs
   def distance2similarity(exprs: Column*): Column = withExpr {
     // TODO: Need a wrapper class because of using unsupported types
-    HiveGenericUDF("distance2similarity",
-      new HiveFunctionWrapper("hivemall.knn.similarity.Distance2SimilarityUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.similarity.Distance2SimilarityUDF",
+      "distance2similarity",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.distance.HammingDistanceUDF
+   * @see [[hivemall.knn.distance.HammingDistanceUDF]]
    * @group knn.distance
    */
   @scala.annotation.varargs
   def hamming_distance(exprs: Column*): Column = withExpr {
-    HiveSimpleUDF("hamming_distance",
-      new HiveFunctionWrapper("hivemall.knn.distance.HammingDistanceUDF"),
-      exprs.map(_.expr))
+    planHiveUDF(
+      "hivemall.knn.distance.HammingDistanceUDF",
+      "hamming_distance",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.distance.PopcountUDF
+   * @see [[hivemall.knn.distance.PopcountUDF]]
    * @group knn.distance
    */
   @scala.annotation.varargs
   def popcnt(exprs: Column*): Column = withExpr {
-    HiveSimpleUDF("popcnt",
-      new HiveFunctionWrapper("hivemall.knn.distance.PopcountUDF"),
-      exprs.map(_.expr))
+    planHiveUDF(
+      "hivemall.knn.distance.PopcountUDF",
+      "popcnt",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.distance.KLDivergenceUDF
+   * @see [[hivemall.knn.distance.KLDivergenceUDF]]
    * @group knn.distance
    */
   @scala.annotation.varargs
   def kld(exprs: Column*): Column = withExpr {
-    HiveSimpleUDF("kld",
-      new HiveFunctionWrapper("hivemall.knn.distance.KLDivergenceUDF"),
-      exprs.map(_.expr))
+    planHiveUDF(
+      "hivemall.knn.distance.KLDivergenceUDF",
+      "kld",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.distance.EuclidDistanceUDF
+   * @see [[hivemall.knn.distance.EuclidDistanceUDF]]
    * @group knn.distance
    */
   @scala.annotation.varargs
   def euclid_distance(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("euclid_distance",
-      new HiveFunctionWrapper("hivemall.knn.distance.EuclidDistanceUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.distance.EuclidDistanceUDF",
+      "euclid_distance",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.distance.CosineDistanceUDF
+   * @see [[hivemall.knn.distance.CosineDistanceUDF]]
    * @group knn.distance
    */
   @scala.annotation.varargs
   def cosine_distance(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("cosine_distance",
-      new HiveFunctionWrapper("hivemall.knn.distance.CosineDistanceUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.distance.CosineDistanceUDF",
+      "cosine_distance",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.distance.AngularDistanceUDF
+   * @see [[hivemall.knn.distance.AngularDistanceUDF]]
    * @group knn.distance
    */
   @scala.annotation.varargs
   def angular_distance(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("angular_distance",
-      new HiveFunctionWrapper("hivemall.knn.distance.AngularDistanceUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.distance.AngularDistanceUDF",
+      "angular_distance",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.distance.ManhattanDistanceUDF
+   * @see [[hivemall.knn.distance.ManhattanDistanceUDF]]
    * @group knn.distance
    */
   @scala.annotation.varargs
   def manhattan_distance(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("manhattan_distance",
-      new HiveFunctionWrapper("hivemall.knn.distance.ManhattanDistanceUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.distance.ManhattanDistanceUDF",
+      "manhattan_distance",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.distance.MinkowskiDistanceUDF
+   * @see [[hivemall.knn.distance.MinkowskiDistanceUDF]]
    * @group knn.distance
    */
   @scala.annotation.varargs
   def minkowski_distance (exprs: Column*): Column = withExpr {
-    HiveGenericUDF("minkowski_distance",
-      new HiveFunctionWrapper("hivemall.knn.distance.MinkowskiDistanceUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.distance.MinkowskiDistanceUDF",
+      "minkowski_distance",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.lsh.bBitMinHashUDF
+   * @see [[hivemall.knn.lsh.bBitMinHashUDF]]
    * @group knn.lsh
    */
   @scala.annotation.varargs
   def bbit_minhash(exprs: Column*): Column = withExpr {
-    HiveSimpleUDF("bbit_minhash",
-      new HiveFunctionWrapper("hivemall.knn.lsh.bBitMinHashUDF"),
-      exprs.map(_.expr))
+    planHiveUDF(
+      "hivemall.knn.lsh.bBitMinHashUDF",
+      "bbit_minhash",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.knn.lsh.MinHashesUDF
+   * @see [[hivemall.knn.lsh.MinHashesUDFWrapper]]
    * @group knn.lsh
    */
   @scala.annotation.varargs
   def minhashes(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("minhashes",
-      new HiveFunctionWrapper("hivemall.knn.lsh.MinHashesUDFWrapper"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.knn.lsh.MinHashesUDFWrapper",
+      "minhashes",
+      exprs
+    )
   }
 
   /**
    * Returns new features with `1.0` (bias) appended to the input features.
+   * @see [[hivemall.ftvec.AddBiasUDFWrapper]]
    * @group ftvec
    */
   def add_bias(expr: Column): Column = withExpr {
-    HiveGenericUDF("add_bias",
-      new HiveFunctionWrapper("hivemall.ftvec.AddBiasUDFWrapper"),
-      expr.expr :: Nil)
+    planHiveGenericUDF(
+      "hivemall.ftvec.AddBiasUDFWrapper",
+      "add_bias",
+      expr :: Nil
+    )
   }
 
   /**
-   * @see hivemall.ftvec.ExtractFeatureUdf
+   * @see [[hivemall.ftvec.ExtractFeatureUDFWrapper]]
    * @group ftvec
    *
    * TODO: This throws java.lang.ClassCastException because
    * HiveInspectors.toInspector has a bug in spark.
    * Need to fix it later.
    */
-  def extract_feature(expr: Column): Column = {
-    val hiveUdf = HiveGenericUDF(
+  def extract_feature(expr: Column): Column = withExpr {
+    planHiveGenericUDF(
+      "hivemall.ftvec.ExtractFeatureUDFWrapper",
       "extract_feature",
-      new HiveFunctionWrapper("hivemall.ftvec.ExtractFeatureUDFWrapper"),
-      expr.expr :: Nil)
-    Column(hiveUdf).as("feature")
-  }
+      expr :: Nil
+    )
+  }.as("feature")
 
   /**
-   * @see hivemall.ftvec.ExtractWeightUdf
+   * @see [[hivemall.ftvec.ExtractWeightUDFWrapper]]
    * @group ftvec
    *
    * TODO: This throws java.lang.ClassCastException because
    * HiveInspectors.toInspector has a bug in spark.
    * Need to fix it later.
    */
-  def extract_weight(expr: Column): Column = {
-    val hiveUdf = HiveGenericUDF(
+  def extract_weight(expr: Column): Column = withExpr {
+    planHiveGenericUDF(
+      "hivemall.ftvec.ExtractWeightUDFWrapper",
       "extract_weight",
-      new HiveFunctionWrapper("hivemall.ftvec.ExtractWeightUDFWrapper"),
-      expr.expr :: Nil)
-    Column(hiveUdf).as("value")
-  }
+      expr :: Nil
+    )
+  }.as("value")
 
   /**
-   * @see hivemall.ftvec.AddFeatureIndexUDFWrapper
+   * @see [[hivemall.ftvec.AddFeatureIndexUDFWrapper]]
    * @group ftvec
    */
   def add_feature_index(expr: Column): Column = withExpr {
-    HiveGenericUDF("add_feature_index",
-      new HiveFunctionWrapper("hivemall.ftvec.AddFeatureIndexUDFWrapper"),
-      expr.expr :: Nil)
+    planHiveGenericUDF(
+      "hivemall.ftvec.AddFeatureIndexUDFWrapper",
+      "add_feature_index",
+      expr :: Nil
+    )
   }
 
   /**
-   * @see hivemall.ftvec.SortByFeatureUDF
+   * @see [[hivemall.ftvec.SortByFeatureUDFWrapper]]
    * @group ftvec
    */
   def sort_by_feature(expr: Column): Column = withExpr {
-    HiveGenericUDF("sort_by_feature",
-      new HiveFunctionWrapper("hivemall.ftvec.SortByFeatureUDFWrapper"),
-      expr.expr :: Nil)
+    planHiveGenericUDF(
+      "hivemall.ftvec.SortByFeatureUDFWrapper",
+      "sort_by_feature",
+      expr :: Nil
+    )
   }
 
   /**
-   * @see hivemall.ftvec.hashing.MurmurHash3UDF
+   * @see [[hivemall.ftvec.hashing.MurmurHash3UDF]]
    * @group ftvec.hashing
    */
   def mhash(expr: Column): Column = withExpr {
-    HiveSimpleUDF("mhash",
-      new HiveFunctionWrapper("hivemall.ftvec.hashing.MurmurHash3UDF"),
-      expr.expr :: Nil)
+    planHiveUDF(
+      "hivemall.ftvec.hashing.MurmurHash3UDF",
+      "mhash",
+      expr :: Nil
+    )
   }
 
   /**
-   * @see hivemall.ftvec.hashing.Sha1UDF
+   * @see [[hivemall.ftvec.hashing.Sha1UDF]]
    * @group ftvec.hashing
    */
   def sha1(expr: Column): Column = withExpr {
-    HiveSimpleUDF("sha1",
-      new HiveFunctionWrapper("hivemall.ftvec.hashing.Sha1UDF"),
-      expr.expr :: Nil)
+    planHiveUDF(
+      "hivemall.ftvec.hashing.Sha1UDF",
+      "sha1",
+      expr :: Nil
+    )
   }
 
   /**
-   * @see hivemall.ftvec.hashing.ArrayHashValuesUDF
+   * @see [[hivemall.ftvec.hashing.ArrayHashValuesUDF]]
    * @group ftvec.hashing
    */
   @scala.annotation.varargs
   def array_hash_values(exprs: Column*): Column = withExpr {
     // TODO: Need a wrapper class because of using unsupported types
-    HiveSimpleUDF("array_hash_values",
-      new HiveFunctionWrapper("hivemall.ftvec.hashing.ArrayHashValuesUDF"),
-      exprs.map(_.expr))
+    planHiveUDF(
+      "hivemall.ftvec.hashing.ArrayHashValuesUDF",
+      "array_hash_values",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF
+   * @see [[hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF]]
    * @group ftvec.hashing
    */
   @scala.annotation.varargs
   def prefixed_hash_values(exprs: Column*): Column = withExpr {
     // TODO: Need a wrapper class because of using unsupported types
-    HiveSimpleUDF("prefixed_hash_values",
-      new HiveFunctionWrapper("hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF"),
-      exprs.map(_.expr))
+    planHiveUDF(
+      "hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF",
+      "prefixed_hash_values",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.ftvec.scaling.RescaleUDF
+   * @see [[hivemall.ftvec.scaling.RescaleUDF]]
    * @group ftvec.scaling
    */
   def rescale(value: Column, max: Column, min: Column): Column = withExpr {
-    HiveSimpleUDF("rescale",
-      new HiveFunctionWrapper("hivemall.ftvec.scaling.RescaleUDF"),
-      (value.cast(FloatType) :: max :: min :: Nil).map(_.expr))
+    planHiveUDF(
+      "hivemall.ftvec.scaling.RescaleUDF",
+      "rescale",
+      value.cast(FloatType) :: max :: min :: Nil
+    )
   }
 
   /**
-   * @see hivemall.ftvec.scaling.ZScoreUDF
+   * @see [[hivemall.ftvec.scaling.ZScoreUDF]]
    * @group ftvec.scaling
    */
   @scala.annotation.varargs
   def zscore(exprs: Column*): Column = withExpr {
-    HiveSimpleUDF("zscore",
-      new HiveFunctionWrapper("hivemall.ftvec.scaling.ZScoreUDF"),
-      exprs.map(_.expr))
+    planHiveUDF(
+      "hivemall.ftvec.scaling.ZScoreUDF",
+      "zscore",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.ftvec.scaling.L2NormalizationUDF
+   * @see [[hivemall.ftvec.scaling.L2NormalizationUDFWrapper]]
    * @group ftvec.scaling
    */
   def normalize(expr: Column): Column = withExpr {
-    HiveGenericUDF("normalize",
-      new HiveFunctionWrapper("hivemall.ftvec.scaling.L2NormalizationUDFWrapper"),
-      expr.expr :: Nil)
+    planHiveGenericUDF(
+      "hivemall.ftvec.scaling.L2NormalizationUDFWrapper",
+      "normalize",
+      expr :: Nil
+    )
   }
 
   /**
-   * @see hivemall.ftvec.selection.ChiSquareUDF
+   * @see [[hivemall.ftvec.selection.ChiSquareUDF]]
    * @group ftvec.selection
    */
   def chi2(observed: Column, expected: Column): Column = withExpr {
-    HiveGenericUDF("chi2",
-      new HiveFunctionWrapper("hivemall.ftvec.selection.ChiSquareUDF"),
-      Seq(observed.expr, expected.expr))
+    planHiveGenericUDF(
+      "hivemall.ftvec.selection.ChiSquareUDF",
+      "chi2",
+      Seq(observed, expected)
+    )
   }
 
   /**
-   * @see hivemall.ftvec.conv.ToDenseFeaturesUDF
+   * @see [[hivemall.ftvec.conv.ToDenseFeaturesUDF]]
    * @group ftvec.conv
    */
   @scala.annotation.varargs
   def to_dense_features(exprs: Column*): Column = withExpr {
     // TODO: Need a wrapper class because of using unsupported types
-    HiveGenericUDF("to_dense_features",
-      new HiveFunctionWrapper("hivemall.ftvec.conv.ToDenseFeaturesUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.ftvec.conv.ToDenseFeaturesUDF",
+      "to_dense_features",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.ftvec.conv.ToSparseFeaturesUDF
+   * @see [[hivemall.ftvec.conv.ToSparseFeaturesUDF]]
    * @group ftvec.conv
    */
   @scala.annotation.varargs
   def to_sparse_features(exprs: Column*): Column = withExpr {
     // TODO: Need a wrapper class because of using unsupported types
-    HiveGenericUDF("to_sparse_features",
-      new HiveFunctionWrapper("hivemall.ftvec.conv.ToSparseFeaturesUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.ftvec.conv.ToSparseFeaturesUDF",
+      "to_sparse_features",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.ftvec.trans.VectorizeFeaturesUDF
+   * @see [[hivemall.ftvec.trans.VectorizeFeaturesUDF]]
    * @group ftvec.trans
    */
   @scala.annotation.varargs
   def vectorize_features(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("vectorize_features",
-      new HiveFunctionWrapper("hivemall.ftvec.trans.VectorizeFeaturesUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.ftvec.trans.VectorizeFeaturesUDF",
+      "vectorize_features",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.ftvec.trans.CategoricalFeaturesUDF
+   * @see [[hivemall.ftvec.trans.CategoricalFeaturesUDF]]
    * @group ftvec.trans
    */
   @scala.annotation.varargs
   def categorical_features(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("categorical_features",
-      new HiveFunctionWrapper("hivemall.ftvec.trans.CategoricalFeaturesUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.ftvec.trans.CategoricalFeaturesUDF",
+      "categorical_features",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.ftvec.trans.IndexedFeatures
+   * @see [[hivemall.ftvec.trans.IndexedFeatures]]
    * @group ftvec.trans
    */
   @scala.annotation.varargs
   def indexed_features(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("indexed_features",
-      new HiveFunctionWrapper("hivemall.ftvec.trans.IndexedFeatures"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.ftvec.trans.IndexedFeatures",
+      "indexed_features",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.ftvec.trans.QuantitativeFeaturesUDF
+   * @see [[hivemall.ftvec.trans.QuantitativeFeaturesUDF]]
    * @group ftvec.trans
    */
   @scala.annotation.varargs
   def quantitative_features(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("quantitative_features",
-      new HiveFunctionWrapper("hivemall.ftvec.trans.QuantitativeFeaturesUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.ftvec.trans.QuantitativeFeaturesUDF",
+      "quantitative_features",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.smile.tools.TreePredictUDF
+   * @see [[hivemall.smile.tools.TreePredictUDF]]
    * @group misc
    */
   @scala.annotation.varargs
   def tree_predict(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("tree_predict",
-      new HiveFunctionWrapper("hivemall.smile.tools.TreePredictUDF"),
-      exprs.map(_.expr))
+    planHiveGenericUDF(
+      "hivemall.smile.tools.TreePredictUDF",
+      "tree_predict",
+      exprs
+    )
   }
 
   /**
-   * @see hivemall.tools.array.SelectKBestUDF
+   * @see [[hivemall.tools.array.SelectKBestUDF]]
    * @group tools.array
    */
   def select_k_best(X: Column, importanceList: Column, k: Column): Column = withExpr {
-    HiveGenericUDF("select_k_best",
-      new HiveFunctionWrapper("hivemall.tools.array.SelectKBestUDF"),
-      Seq(X.expr, importanceList.expr, k.expr))
+    planHiveGenericUDF(
+      "hivemall.tools.array.SelectKBestUDF",
+      "select_k_best",
+      Seq(X, importanceList, k)
+    )
   }
 
   /**
-   * @see hivemall.tools.math.SigmoidUDF
+   * @see [[hivemall.tools.math.SigmoidGenericUDF]]
    * @group misc
    */
   def sigmoid(expr: Column): Column = {
@@ -1313,11 +1336,15 @@ object HivemallOps {
   }
 
   /**
-   * @see hivemall.tools.mapred.RowIdUDF
+   * @see [[hivemall.tools.mapred.RowIdUDFWrapper]]
    * @group misc
    */
   def rowid(): Column = withExpr {
-    HiveGenericUDF("rowid", new HiveFunctionWrapper("hivemall.tools.mapred.RowIdUDFWrapper"), Nil)
+    planHiveGenericUDF(
+      "hivemall.tools.mapred.RowIdUDFWrapper",
+      "rowid",
+       Nil
+    )
   }.as("rowid")
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala
new file mode 100644
index 0000000..ab5c5fb
--- /dev/null
+++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.hive.internal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan}
+import org.apache.spark.sql.hive._
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
+
+/**
+ * This is an implementation class for [[org.apache.spark.sql.hive.HivemallOps]].
+ * This class mainly uses the internal Spark classes (e.g., `Generate` and `HiveGenericUDTF`) that
+ * have unstable interfaces (so, these interfaces may evolve in upcoming releases).
+ * Therefore, the objective of this class is to extract these unstable parts
+ * from [[org.apache.spark.sql.hive.HivemallOps]].
+ */
+private[hive] object HivemallOpsImpl extends Logging {
+
+  def planHiveUDF(
+      className: String,
+      funcName: String,
+      argumentExprs: Seq[Column]): Expression = {
+    HiveSimpleUDF(
+      name = funcName,
+      funcWrapper = new HiveFunctionWrapper(className),
+      children = argumentExprs.map(_.expr)
+     )
+  }
+
+  def planHiveGenericUDF(
+      className: String,
+      funcName: String,
+      argumentExprs: Seq[Column]): Expression = {
+    HiveGenericUDF(
+      name = funcName,
+      funcWrapper = new HiveFunctionWrapper(className),
+      children = argumentExprs.map(_.expr)
+     )
+  }
+
+  def planHiveGenericUDTF(
+      df: DataFrame,
+      className: String,
+      funcName: String,
+      argumentExprs: Seq[Column],
+      outputAttrNames: Seq[String]): LogicalPlan = {
+    Generate(
+      generator = HiveGenericUDTF(
+        name = funcName,
+        funcWrapper = new HiveFunctionWrapper(className),
+        children = argumentExprs.map(_.expr)
+      ),
+      join = false,
+      outer = false,
+      qualifier = None,
+      generatorOutput = outputAttrNames.map(UnresolvedAttribute(_)),
+      child = df.logicalPlan)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
index 49773cc..493feda 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
@@ -357,7 +357,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
           testData("features"), lit(true)).as("predicted")
       )
       .groupBy($"rowid")
-      .rf_ensemble("predicted").as("rowid", "predicted")
+      .rf_ensemble("predicted").toDF("rowid", "predicted")
       .select($"predicted.label")
 
     checkAnswer(predicted, Seq(Row(0), Row(1)))
@@ -447,7 +447,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
     val testDf = Seq(
       (Array(0.3, 0.1, 0.2), 1),
       (Array(0.3, 0.1, 0.2), 0),
-      (Array(0.3, 0.1, 0.2), 0)).toDF.as("features", "label")
+      (Array(0.3, 0.1, 0.2), 0)).toDF("features", "label")
     Seq(
       "train_randomforest_regr",
       "train_randomforest_classifier"
@@ -469,7 +469,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
       } else {
         res.groupBy("feature").argmin_kld("weight", "conv")
       }
-    }.as("feature", "weight")
+    }.toDF("feature", "weight")
 
     // Data preparation
     val testDf = LargeRegrTrainData
@@ -485,14 +485,14 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
       .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
       .select($"rowid", ($"weight" * $"value").as("value"))
       .groupBy("rowid").sum("value")
-      .as("rowid", "predicted")
+      .toDF("rowid", "predicted")
 
     // Evaluation
     val eval = predict
       .join(testDf, predict("rowid") === testDf("rowid"))
       .groupBy()
       .agg(Map("target" -> "avg", "predicted" -> "avg"))
-      .as("target", "predicted")
+      .toDF("target", "predicted")
 
     val diff = eval.map {
       case Row(target: Double, predicted: Double) =>
@@ -514,7 +514,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
       } else {
         res.groupBy("feature").argmin_kld("weight", "conv")
       }
-    }.as("feature", "weight")
+    }.toDF("feature", "weight")
 
     // Data preparation
     val testDf = LargeClassifierTestData
@@ -537,7 +537,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
        * Perhaps you need to use aliases.
        */
       .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0))
-      .as("rowid", "predicted")
+      .toDF("rowid", "predicted")
 
     // Evaluation
     val eval = predict
@@ -586,31 +586,32 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
   test("user-defined aggregators for ensembles") {
     import hiveContext.implicits._
 
-    val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF.as("c0", "c1")
+    val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF("c0", "c1")
     val row1 = df1.groupBy($"c0").voted_avg("c1").collect
     assert(row1(0).getDouble(1) ~== 0.15)
     assert(row1(1).getDouble(1) ~== 0.10)
 
-    val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF.as("c0", "c1")
+    val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF("c0", "c1")
     val row3 = df3.groupBy($"c0").weight_voted_avg("c1").collect
     assert(row3(0).getDouble(1) ~== 0.50)
     assert(row3(1).getDouble(1) ~== 0.30)
 
-    val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF.as("c0", "c1", "c2")
+    val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF("c0", "c1", "c2")
     val row5 = df5.groupBy($"c0").argmin_kld("c1", "c2").collect
     assert(row5(0).getFloat(1) ~== 0.266666666)
     assert(row5(1).getFloat(1) ~== 0.80)
 
-    val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF.as("c0", "c1", "c2")
+    val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF("c0", "c1", "c2")
     val row6 = df6.groupBy($"c0").max_label("c2", "c1").collect
     assert(row6(0).getString(1) == "id-1")
 
-    val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF.as("c0", "c1", "c2")
-    val row7 = df7.groupBy($"c0").maxrow("c2", "c1").as("c0", "c1").select($"c1.col1").collect
+    val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF("c0", "c1", "c2")
+    val row7 = df7.groupBy($"c0").maxrow("c2", "c1").toDF("c0", "c1").select($"c1.col1").collect
     assert(row7(0).getString(0) == "id-0")
 
-    val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF.as("c0", "c1")
-    val row8 = df8.groupBy($"c0").rf_ensemble("c1").as("c0", "c1").select("c1.probability").collect
+    val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF("c0", "c1")
+    val row8 = df8.groupBy($"c0").rf_ensemble("c1").toDF("c0", "c1")
+      .select("c1.probability").collect
     assert(row8(0).getDouble(0) ~== 0.3333333333)
     assert(row8(1).getDouble(0) ~== 1.0)
   }
@@ -618,20 +619,20 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
   test("user-defined aggregators for evaluation") {
     import hiveContext.implicits._
 
-    val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF.as("c0", "c1", "c2")
+    val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF("c0", "c1", "c2")
     val row1 = df1.groupBy($"c0").mae("c1", "c2").collect
     assert(row1(0).getDouble(1) ~== 0.26666666)
 
-    val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2")
+    val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF("c0", "c1", "c2")
     val row2 = df2.groupBy($"c0").mse("c1", "c2").collect
     assert(row2(0).getDouble(1) ~== 0.29999999)
 
-    val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2")
+    val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF("c0", "c1", "c2")
     val row3 = df3.groupBy($"c0").rmse("c1", "c2").collect
     assert(row3(0).getDouble(1) ~== 0.54772253)
 
     val df4 = Seq((1, Array(1, 2), Array(2, 3)), (1, Array(3, 8), Array(5, 4))).toDF
-      .as("c0", "c1", "c2")
+      .toDF("c0", "c1", "c2")
     val row4 = df4.groupBy($"c0").f1score("c1", "c2").collect
     assert(row4(0).getDouble(1) ~== 0.25)
   }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
index 3b87a96..06a4dc0 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
@@ -170,7 +170,7 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
         } else {
           res.groupBy("feature").argmin_kld("weight", "conv")
         }
-      }.as("feature", "weight")
+      }.toDF("feature", "weight")
 
       // Data preparation
       val testDf = testA9aData
@@ -186,14 +186,14 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
         .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
         .select($"rowid", ($"weight" * $"value").as("value"))
         .groupBy("rowid").sum("value")
-        .as("rowid", "predicted")
+        .toDF("rowid", "predicted")
 
       // Evaluation
       val eval = predict
         .join(testDf, predict("rowid") === testDf("rowid"))
         .groupBy()
         .agg(Map("target" -> "avg", "predicted" -> "avg"))
-        .as("target", "predicted")
+        .toDF("target", "predicted")
 
       val (target, predicted) = eval.map {
         case Row(target: Double, predicted: Double) => (target, predicted)
@@ -238,7 +238,7 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
         } else {
           res.groupBy("feature").argmin_kld("weight", "conv")
         }
-      }.as("feature", "weight")
+      }.toDF("feature", "weight")
 
       // Data preparation
       val testDf = testKdd2010aData
@@ -255,7 +255,7 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
         .select($"rowid", ($"weight" * $"value").as("value"))
         .groupBy("rowid").sum("value")
         .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0))
-        .as("rowid", "predicted")
+        .toDF("rowid", "predicted")
 
       // Evaluation
       val eval = predict

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
index 8c9c0c3..37e0989 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
@@ -75,7 +75,7 @@ final class XGBoostSuite extends VectorQueryTest {
       val predict = model.join(mllibTestDf)
         .xgboost_predict($"rowid", $"features", $"model_id", $"pred_model")
         .groupBy("rowid").avg()
-        .as("rowid", "predicted")
+        .toDF("rowid", "predicted")
 
       val result = predict.join(mllibTestDf, predict("rowid") === mllibTestDf("rowid"), "INNER")
         .select(predict("rowid"), $"predicted", $"label")
@@ -100,7 +100,7 @@ final class XGBoostSuite extends VectorQueryTest {
       val predict = model.join(mllibTestDf)
         .xgboost_predict($"rowid", $"features", $"model_id", $"pred_model")
         .groupBy("rowid").avg()
-        .as("rowid", "predicted")
+        .toDF("rowid", "predicted")
 
       val result = predict.join(mllibTestDf, predict("rowid") === mllibTestDf("rowid"), "INNER")
         .select(

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
index 598479d..b15c77c 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
@@ -143,7 +143,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
           testDf.join(testModel, testDf("feature") === testModel("feature"), "LEFT_OUTER")
             .select($"rowid", ($"weight" * $"value").as("value"))
             .groupBy("rowid").sum("value")
-            .as("rowid", "value")
+            .toDF("rowid", "value")
             .select($"rowid", sigmoid($"value"))
         }