You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by ya...@apache.org on 2017/01/27 02:57:38 UTC
incubator-hivemall git commit: Close #29: [HIVEMALL-39][SPARK] Put
the use of HiveUDFs in one place
Repository: incubator-hivemall
Updated Branches:
refs/heads/master 70f42038a -> 88fae5208
Close #29: [HIVEMALL-39][SPARK] Put the use of HiveUDFs in one place
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/88fae520
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/88fae520
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/88fae520
Branch: refs/heads/master
Commit: 88fae52085f8e29d36cc038879b1d2a41f383a6c
Parents: 70f4203
Author: Takeshi YAMAMURO <li...@gmail.com>
Authored: Fri Jan 27 11:57:27 2017 +0900
Committer: Takeshi YAMAMURO <li...@gmail.com>
Committed: Fri Jan 27 11:57:27 2017 +0900
----------------------------------------------------------------------
.../org/apache/spark/sql/hive/HivemallOps.scala | 1109 +++++++++---------
.../sql/hive/internal/HivemallOpsImpl.scala | 78 ++
.../spark/sql/hive/HivemallOpsSuite.scala | 39 +-
.../spark/sql/hive/ModelMixingSuite.scala | 10 +-
.../apache/spark/sql/hive/XGBoostSuite.scala | 4 +-
.../streaming/HivemallOpsWithFeatureSuite.scala | 2 +-
6 files changed, 674 insertions(+), 568 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
index e3e20ee..6e588a9 100644
--- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
+++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{EachTopK, Expression, Literal, NamedExpression, UserDefinedGenerator}
import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project}
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -54,607 +53,563 @@ import org.apache.spark.unsafe.types.UTF8String
* @groupname misc
*/
final class HivemallOps(df: DataFrame) extends Logging {
+ import internal.HivemallOpsImpl._
private[this] val _sparkSession = df.sparkSession
private[this] val _analyzer = _sparkSession.sessionState.analyzer
/**
- * @see hivemall.regression.AdaDeltaUDTF
+ * @see [[hivemall.regression.AdaDeltaUDTF]]
* @group regression
*/
@scala.annotation.varargs
def train_adadelta(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_adadelta",
- new HiveFunctionWrapper("hivemall.regression.AdaDeltaUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AdaDeltaUDTF",
+ "train_adadelta",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.regression.AdaGradUDTF
+ * @see [[hivemall.regression.AdaGradUDTF]]
* @group regression
*/
@scala.annotation.varargs
def train_adagrad(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_adagrad",
- new HiveFunctionWrapper("hivemall.regression.AdaGradUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AdaGradUDTF",
+ "train_adagrad",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.regression.AROWRegressionUDTF
+ * @see [[hivemall.regression.AROWRegressionUDTF]]
* @group regression
*/
@scala.annotation.varargs
def train_arow_regr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_arow_regr",
- new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AROWRegressionUDTF",
+ "train_arow_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.regression.AROWRegressionUDTF$AROWe
+ * @see [[hivemall.regression.AROWRegressionUDTF.AROWe]]
* @group regression
*/
@scala.annotation.varargs
def train_arowe_regr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_arowe_regr",
- new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AROWRegressionUDTF$AROWe",
+ "train_arowe_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.regression.AROWRegressionUDTF$AROWe2
+ * @see [[hivemall.regression.AROWRegressionUDTF.AROWe2]]
* @group regression
*/
@scala.annotation.varargs
def train_arowe2_regr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_arowe2_regr",
- new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe2"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AROWRegressionUDTF$AROWe2",
+ "train_arowe2_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.regression.LogressUDTF
+ * @see [[hivemall.regression.LogressUDTF]]
* @group regression
*/
@scala.annotation.varargs
def train_logregr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_logregr",
- new HiveFunctionWrapper("hivemall.regression.LogressUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.LogressUDTF",
+ "train_logregr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.regression.PassiveAggressiveRegressionUDTF
+ * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF]]
* @group regression
*/
@scala.annotation.varargs
def train_pa1_regr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_pa1_regr",
- new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.PassiveAggressiveRegressionUDTF",
+ "train_pa1_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a
+ * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a]]
* @group regression
*/
@scala.annotation.varargs
def train_pa1a_regr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_pa1a_regr",
- new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a",
+ "train_pa1a_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2
+ * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2]]
* @group regression
*/
@scala.annotation.varargs
def train_pa2_regr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_pa2_regr",
- new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2",
+ "train_pa2_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a
+ * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a]]
* @group regression
*/
@scala.annotation.varargs
def train_pa2a_regr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_pa2a_regr",
- new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a",
+ "train_pa2a_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.smile.regression.RandomForestRegressionUDTF
+ * @see [[hivemall.smile.regression.RandomForestRegressionUDTF]]
* @group regression
*/
@scala.annotation.varargs
def train_randomforest_regr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_randomforest_regr",
- new HiveFunctionWrapper("hivemall.smile.regression.RandomForestRegressionUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
+ planHiveGenericUDTF(
+ df,
+ "hivemall.smile.regression.RandomForestRegressionUDTF",
+ "train_randomforest_regr",
+ setMixServs(toHivemallFeatures(exprs)),
Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
- .map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ )
}
/**
- * @see hivemall.classifier.PerceptronUDTF
+ * @see [[hivemall.classifier.PerceptronUDTF]]
* @group classifier
*/
@scala.annotation.varargs
def train_perceptron(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_perceptron",
- new HiveFunctionWrapper("hivemall.classifier.PerceptronUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.PerceptronUDTF",
+ "train_perceptron",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.classifier.PassiveAggressiveUDTF
+ * @see [[hivemall.classifier.PassiveAggressiveUDTF]]
* @group classifier
*/
@scala.annotation.varargs
def train_pa(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_pa",
- new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.PassiveAggressiveUDTF",
+ "train_pa",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.classifier.PassiveAggressiveUDTF$PA1
+ * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA1]]
* @group classifier
*/
@scala.annotation.varargs
def train_pa1(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_pa1",
- new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA1"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.PassiveAggressiveUDTF$PA1",
+ "train_pa1",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.classifier.PassiveAggressiveUDTF$PA2
+ * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA2]]
* @group classifier
*/
@scala.annotation.varargs
def train_pa2(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_pa2",
- new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA2"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.PassiveAggressiveUDTF$PA2",
+ "train_pa2",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.classifier.ConfidenceWeightedUDTF
+ * @see [[hivemall.classifier.ConfidenceWeightedUDTF]]
* @group classifier
*/
@scala.annotation.varargs
def train_cw(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_cw",
- new HiveFunctionWrapper("hivemall.classifier.ConfidenceWeightedUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.ConfidenceWeightedUDTF",
+ "train_cw",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.classifier.AROWClassifierUDTF
+ * @see [[hivemall.classifier.AROWClassifierUDTF]]
* @group classifier
*/
@scala.annotation.varargs
def train_arow(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_arow",
- new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.AROWClassifierUDTF",
+ "train_arow",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.classifier.AROWClassifierUDTF$AROWh
+ * @see [[hivemall.classifier.AROWClassifierUDTF.AROWh]]
* @group classifier
*/
@scala.annotation.varargs
def train_arowh(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_arowh",
- new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF$AROWh"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.AROWClassifierUDTF$AROWh",
+ "train_arowh",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1
+ * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]]
* @group classifier
*/
@scala.annotation.varargs
def train_scw(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_scw",
- new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW1"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.SoftConfideceWeightedUDTF$SCW1",
+ "train_scw",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1
+ * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]]
* @group classifier
*/
@scala.annotation.varargs
def train_scw2(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_scw2",
- new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW2"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.SoftConfideceWeightedUDTF$SCW2",
+ "train_scw2",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.classifier.classifier.AdaGradRDAUDTF
+ * @see [[hivemall.classifier.AdaGradRDAUDTF]]
* @group classifier
*/
@scala.annotation.varargs
def train_adagrad_rda(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_adagrad_rda",
- new HiveFunctionWrapper("hivemall.classifier.AdaGradRDAUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.AdaGradRDAUDTF",
+ "train_adagrad_rda",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
}
/**
- * @see hivemall.smile.classification.RandomForestClassifierUDTF
+ * @see [[hivemall.smile.classification.RandomForestClassifierUDTF]]
* @group classifier
*/
@scala.annotation.varargs
def train_randomforest_classifier(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_randomforest_classifier",
- new HiveFunctionWrapper("hivemall.smile.classification.RandomForestClassifierUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
+ planHiveGenericUDTF(
+ df,
+ "hivemall.smile.classification.RandomForestClassifierUDTF",
+ "train_randomforest_classifier",
+ setMixServs(toHivemallFeatures(exprs)),
Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
- .map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ )
}
/**
- * @see hivemall.classifier.classifier.MulticlassPerceptronUDTF
+ * @see [[hivemall.classifier.multiclass.MulticlassPerceptronUDTF]]
* @group classifier.multiclass
*/
@scala.annotation.varargs
def train_multiclass_perceptron(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_multiclass_perceptron",
- new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPerceptronUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassPerceptronUDTF",
+ "train_multiclass_perceptron",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight")
+ )
}
/**
- * @see hivemall.classifier.classifier.PassiveAggressiveUDTF
+ * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF]]
* @group classifier.multiclass
*/
@scala.annotation.varargs
def train_multiclass_pa(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_multiclass_pa",
- new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF",
+ "train_multiclass_pa",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight")
+ )
}
/**
- * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA1
+ * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA1]]
* @group classifier.multiclass
*/
@scala.annotation.varargs
def train_multiclass_pa1(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_multiclass_pa1",
- new HiveFunctionWrapper(
- "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1",
+ "train_multiclass_pa1",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight")
+ )
}
/**
- * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA2
+ * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA2]]
* @group classifier.multiclass
*/
@scala.annotation.varargs
def train_multiclass_pa2(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_multiclass_pa2",
- new HiveFunctionWrapper(
- "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2",
+ "train_multiclass_pa2",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight")
+ )
}
/**
- * @see hivemall.classifier.classifier.MulticlassConfidenceWeightedUDTF
+ * @see [[hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF]]
* @group classifier.multiclass
*/
@scala.annotation.varargs
def train_multiclass_cw(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_multiclass_cw",
- new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF",
+ "train_multiclass_cw",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.classifier.classifier.MulticlassAROWClassifierUDTF
+ * @see [[hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF]]
* @group classifier.multiclass
*/
@scala.annotation.varargs
def train_multiclass_arow(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_multiclass_arow",
- new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF",
+ "train_multiclass_arow",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW1
+ * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW1]]
* @group classifier.multiclass
*/
@scala.annotation.varargs
def train_multiclass_scw(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_multiclass_scw",
- new HiveFunctionWrapper(
- "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1"),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1",
+ "train_multiclass_scw",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight", "conv")
+ )
}
/**
- * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW2
+ * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW2]]
* @group classifier.multiclass
*/
@scala.annotation.varargs
def train_multiclass_scw2(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_multiclass_scw2",
- new HiveFunctionWrapper(
- "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2"
- ),
- setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2",
+ "train_multiclass_scw2",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight", "conv")
+ )
}
/**
* :: Experimental ::
- * @see hivemall.xgboost.regression.XGBoostRegressionUDTF
+ * @see [[hivemall.xgboost.regression.XGBoostRegressionUDTF]]
* @group xgboost
*/
@Experimental
@scala.annotation.varargs
def train_xgboost_regr(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_xgboost_regr",
- new HiveFunctionWrapper("hivemall.xgboost.regression.XGBoostRegressionUDTFWrapper"),
- toHivemallFeatureDf(exprs : _*).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("model_id", "pred_model").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.regression.XGBoostRegressionUDTFWrapper",
+ "train_xgboost_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("model_id", "pred_model")
+ )
}
/**
* :: Experimental ::
- * @see hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF
+ * @see [[hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF]]
* @group xgboost
*/
@Experimental
@scala.annotation.varargs
def train_xgboost_classifier(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_xgboost_classifier",
- new HiveFunctionWrapper(
- "hivemall.xgboost.classification.XGBoostBinaryClassifierUDTFWrapper"),
- toHivemallFeatureDf(exprs : _*).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("model_id", "pred_model").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.classification.XGBoostBinaryClassifierUDTFWrapper",
+ "train_xgboost_classifier",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("model_id", "pred_model")
+ )
}
/**
* :: Experimental ::
- * @see hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF
+ * @see [[hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF]]
* @group xgboost
*/
@Experimental
@scala.annotation.varargs
def train_xgboost_multiclass_classifier(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "train_xgboost_multiclass_classifier",
- new HiveFunctionWrapper(
- "hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTFWrapper"
- ),
- toHivemallFeatureDf(exprs: _*).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("model_id", "pred_model").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTFWrapper",
+ "train_xgboost_multiclass_classifier",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("model_id", "pred_model")
+ )
}
/**
* :: Experimental ::
- * @see hivemall.xgboost.tools.XGBoostPredictUDTF
+ * @see [[hivemall.xgboost.tools.XGBoostPredictUDTF]]
* @group xgboost
*/
@Experimental
@scala.annotation.varargs
def xgboost_predict(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "xgboost_predict",
- new HiveFunctionWrapper("hivemall.xgboost.tools.XGBoostPredictUDTF"),
- toHivemallFeatureDf(exprs: _*).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("rowid", "predicted").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.tools.XGBoostPredictUDTF",
+ "xgboost_predict",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("rowid", "predicted")
+ )
}
/**
* :: Experimental ::
- * @see hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF
+ * @see [[hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF]]
* @group xgboost
*/
@Experimental
@scala.annotation.varargs
def xgboost_multiclass_predict(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "xgboost_multiclass_predict",
- new HiveFunctionWrapper("hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF"),
- toHivemallFeatureDf(exprs: _*).map(_.expr)
- ),
- join = false, outer = false, None,
- Seq("rowid", "label", "probability").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF",
+ "xgboost_multiclass_predict",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("rowid", "label", "probability")
+ )
}
/**
- * @see hivemall.knn.lsh.MinHashUDTF
+ * @see [[hivemall.knn.lsh.MinHashUDTF]]
* @group knn.lsh
*/
@scala.annotation.varargs
def minhash(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "minhash",
- new HiveFunctionWrapper("hivemall.knn.lsh.MinHashUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- Seq("clusterid", "item").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.knn.lsh.MinHashUDTF",
+ "minhash",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("clusterid", "item")
+ )
}
/**
- * @see hivemall.ftvec.amplify.AmplifierUDTF
+ * @see [[hivemall.ftvec.amplify.AmplifierUDTF]]
* @group ftvec.amplify
*/
@scala.annotation.varargs
@@ -663,17 +618,17 @@ final class HivemallOps(df: DataFrame) extends Logging {
case Column(expr: NamedExpression) => UnresolvedAttribute(expr.name)
case Column(expr: Expression) => UnresolvedAttribute(expr.simpleString)
}
- Generate(HiveGenericUDTF(
- "amplify",
- new HiveFunctionWrapper("hivemall.ftvec.amplify.AmplifierUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- outputAttr,
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.ftvec.amplify.AmplifierUDTF",
+ "amplify",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("clusterid", "item")
+ )
}
/**
- * @see hivemall.ftvec.amplify.RandomAmplifierUDTF
+ * @see [[hivemall.ftvec.amplify.RandomAmplifierUDTF]]
* @group ftvec.amplify
*/
@scala.annotation.varargs
@@ -702,48 +657,48 @@ final class HivemallOps(df: DataFrame) extends Logging {
/**
* Quantifies input columns.
- * @see hivemall.ftvec.conv.QuantifyColumnsUDTF
+ * @see [[hivemall.ftvec.conv.QuantifyColumnsUDTF]]
* @group ftvec.conv
*/
@scala.annotation.varargs
def quantify(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "quantify",
- new HiveFunctionWrapper("hivemall.ftvec.conv.QuantifyColumnsUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.ftvec.conv.QuantifyColumnsUDTF",
+ "quantify",
+ setMixServs(toHivemallFeatures(exprs)),
+ (0 until exprs.size - 1).map(i => s"c$i")
+ )
}
/**
- * @see hivemall.ftvec.trans.BinarizeLabelUDTF
+ * @see [[hivemall.ftvec.trans.BinarizeLabelUDTF]]
* @group ftvec.trans
*/
@scala.annotation.varargs
def binarize_label(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "binarize_label",
- new HiveFunctionWrapper("hivemall.ftvec.trans.BinarizeLabelUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.ftvec.trans.BinarizeLabelUDTF",
+ "binarize_label",
+ setMixServs(toHivemallFeatures(exprs)),
+ (0 until exprs.size - 1).map(i => s"c$i")
+ )
}
/**
- * @see hivemall.ftvec.trans.QuantifiedFeaturesUDTF
+ * @see [[hivemall.ftvec.trans.QuantifiedFeaturesUDTF]]
* @group ftvec.trans
*/
@scala.annotation.varargs
def quantified_features(exprs: Column*): DataFrame = withTypedPlan {
- Generate(HiveGenericUDTF(
- "quantified_features",
- new HiveFunctionWrapper("hivemall.ftvec.trans.QuantifiedFeaturesUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- Seq("features").map(UnresolvedAttribute(_)),
- df.logicalPlan)
+ planHiveGenericUDTF(
+ df,
+ "hivemall.ftvec.trans.QuantifiedFeaturesUDTF",
+ "quantified_features",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("features")
+ )
}
/**
@@ -758,7 +713,7 @@ final class HivemallOps(df: DataFrame) extends Logging {
}
/**
- * Splits `org.apache.spark.ml.linalg.Vector` into pieces.
+ * Splits [[Vector]] into pieces.
* @group ftvec
*/
def explode_vector(expr: Column): DataFrame = {
@@ -811,37 +766,28 @@ final class HivemallOps(df: DataFrame) extends Logging {
}
/**
- * Returns a new [[DataFrame]] with columns renamed.
- * This is a wrapper for DataFrame#toDF.
+ * @see [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]]
* @group misc
*/
@scala.annotation.varargs
- def as(colNames: String*): DataFrame = df.toDF(colNames: _*)
+ def lr_datagen(exprs: Column*): Dataset[Row] = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper",
+ "lr_datagen",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "features")
+ )
+ }
/**
* Returns all the columns as Seq[Column] in this [[DataFrame]].
- * @group misc
*/
- def cols: Seq[Column] = {
+ private[sql] def cols: Seq[Column] = {
df.schema.fields.map(col => df.col(col.name)).toSeq
}
/**
- * @see hivemall.dataset.LogisticRegressionDataGeneratorUDTF
- * @group misc
- */
- @scala.annotation.varargs
- def lr_datagen(exprs: Column*): Dataset[Row] = withTypedPlan {
- Generate(HiveGenericUDTF(
- "lr_datagen",
- new HiveFunctionWrapper("hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "features").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
* :: Experimental ::
* If a parameter '-mix' does not exist in a 3rd argument,
* set it from an environmental variable
@@ -868,7 +814,10 @@ final class HivemallOps(df: DataFrame) extends Logging {
}
}
- @inline private[this] def toHivemallFeatureDf(exprs: Column*): Seq[Column] = {
+ /**
+ * If the input is a [[Vector]], transform it into Hivemall features.
+ */
+ @inline private[this] def toHivemallFeatures(exprs: Seq[Column]): Seq[Column] = {
df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map {
case (StructField(_, _: VectorUDT, _, _), c) => HivemallUtils.to_hivemall_features(c)
case (_, c) => c
@@ -886,6 +835,7 @@ final class HivemallOps(df: DataFrame) extends Logging {
}
object HivemallOps {
+ import internal.HivemallOpsImpl._
/**
* Implicitly inject the [[HivemallOps]] into [[DataFrame]].
@@ -894,417 +844,490 @@ object HivemallOps {
new HivemallOps(df)
/**
- * @see hivemall.HivemallVersionUDF
+ * @see [[hivemall.HivemallVersionUDF]]
* @group misc
*/
def hivemall_version(): Column = withExpr {
- HiveSimpleUDF("hivemall_version", new HiveFunctionWrapper("hivemall.HivemallVersionUDF"), Nil)
+ planHiveUDF(
+ "hivemall.HivemallVersionUDF",
+ "hivemall_version",
+ Nil
+ )
}
/**
- * @see hivemall.knn.similarity.CosineSimilarityUDF
+ * @see [[hivemall.knn.similarity.CosineSimilarityUDF]]
* @group knn.similarity
*/
@scala.annotation.varargs
def cosine_sim(exprs: Column*): Column = withExpr {
- HiveGenericUDF("cosine_sim",
- new HiveFunctionWrapper("hivemall.knn.similarity.CosineSimilarityUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.similarity.CosineSimilarityUDF",
+ "cosine_sim",
+ exprs
+ )
}
/**
- * @see hivemall.knn.similarity.JaccardIndexUDF
+ * @see [[hivemall.knn.similarity.JaccardIndexUDF]]
* @group knn.similarity
*/
@scala.annotation.varargs
def jaccard(exprs: Column*): Column = withExpr {
- HiveSimpleUDF("jaccard",
- new HiveFunctionWrapper("hivemall.knn.similarity.JaccardIndexUDF"),
- exprs.map(_.expr))
+ planHiveUDF(
+ "hivemall.knn.similarity.JaccardIndexUDF",
+ "jaccard",
+ exprs
+ )
}
/**
- * @see hivemall.knn.similarity.AngularSimilarityUDF
+ * @see [[hivemall.knn.similarity.AngularSimilarityUDF]]
* @group knn.similarity
*/
@scala.annotation.varargs
def angular_similarity(exprs: Column*): Column = withExpr {
- HiveGenericUDF("angular_similarity",
- new HiveFunctionWrapper("hivemall.knn.similarity.AngularSimilarityUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.similarity.AngularSimilarityUDF",
+ "angular_similarity",
+ exprs
+ )
}
/**
- * @see hivemall.knn.similarity.EuclidSimilarity
+ * @see [[hivemall.knn.similarity.EuclidSimilarity]]
* @group knn.similarity
*/
@scala.annotation.varargs
def euclid_similarity(exprs: Column*): Column = withExpr {
- HiveGenericUDF("euclid_similarity",
- new HiveFunctionWrapper("hivemall.knn.similarity.EuclidSimilarity"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.similarity.EuclidSimilarity",
+ "euclid_similarity",
+ exprs
+ )
}
/**
- * @see hivemall.knn.similarity.Distance2SimilarityUDF
+ * @see [[hivemall.knn.similarity.Distance2SimilarityUDF]]
* @group knn.similarity
*/
@scala.annotation.varargs
def distance2similarity(exprs: Column*): Column = withExpr {
// TODO: Need a wrapper class because of using unsupported types
- HiveGenericUDF("distance2similarity",
- new HiveFunctionWrapper("hivemall.knn.similarity.Distance2SimilarityUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.similarity.Distance2SimilarityUDF",
+ "distance2similarity",
+ exprs
+ )
}
/**
- * @see hivemall.knn.distance.HammingDistanceUDF
+ * @see [[hivemall.knn.distance.HammingDistanceUDF]]
* @group knn.distance
*/
@scala.annotation.varargs
def hamming_distance(exprs: Column*): Column = withExpr {
- HiveSimpleUDF("hamming_distance",
- new HiveFunctionWrapper("hivemall.knn.distance.HammingDistanceUDF"),
- exprs.map(_.expr))
+ planHiveUDF(
+ "hivemall.knn.distance.HammingDistanceUDF",
+ "hamming_distance",
+ exprs
+ )
}
/**
- * @see hivemall.knn.distance.PopcountUDF
+ * @see [[hivemall.knn.distance.PopcountUDF]]
* @group knn.distance
*/
@scala.annotation.varargs
def popcnt(exprs: Column*): Column = withExpr {
- HiveSimpleUDF("popcnt",
- new HiveFunctionWrapper("hivemall.knn.distance.PopcountUDF"),
- exprs.map(_.expr))
+ planHiveUDF(
+ "hivemall.knn.distance.PopcountUDF",
+ "popcnt",
+ exprs
+ )
}
/**
- * @see hivemall.knn.distance.KLDivergenceUDF
+ * @see [[hivemall.knn.distance.KLDivergenceUDF]]
* @group knn.distance
*/
@scala.annotation.varargs
def kld(exprs: Column*): Column = withExpr {
- HiveSimpleUDF("kld",
- new HiveFunctionWrapper("hivemall.knn.distance.KLDivergenceUDF"),
- exprs.map(_.expr))
+ planHiveUDF(
+ "hivemall.knn.distance.KLDivergenceUDF",
+ "kld",
+ exprs
+ )
}
/**
- * @see hivemall.knn.distance.EuclidDistanceUDF
+ * @see [[hivemall.knn.distance.EuclidDistanceUDF]]
* @group knn.distance
*/
@scala.annotation.varargs
def euclid_distance(exprs: Column*): Column = withExpr {
- HiveGenericUDF("euclid_distance",
- new HiveFunctionWrapper("hivemall.knn.distance.EuclidDistanceUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.distance.EuclidDistanceUDF",
+ "euclid_distance",
+ exprs
+ )
}
/**
- * @see hivemall.knn.distance.CosineDistanceUDF
+ * @see [[hivemall.knn.distance.CosineDistanceUDF]]
* @group knn.distance
*/
@scala.annotation.varargs
def cosine_distance(exprs: Column*): Column = withExpr {
- HiveGenericUDF("cosine_distance",
- new HiveFunctionWrapper("hivemall.knn.distance.CosineDistanceUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.distance.CosineDistanceUDF",
+ "cosine_distance",
+ exprs
+ )
}
/**
- * @see hivemall.knn.distance.AngularDistanceUDF
+ * @see [[hivemall.knn.distance.AngularDistanceUDF]]
* @group knn.distance
*/
@scala.annotation.varargs
def angular_distance(exprs: Column*): Column = withExpr {
- HiveGenericUDF("angular_distance",
- new HiveFunctionWrapper("hivemall.knn.distance.AngularDistanceUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.distance.AngularDistanceUDF",
+ "angular_distance",
+ exprs
+ )
}
/**
- * @see hivemall.knn.distance.ManhattanDistanceUDF
+ * @see [[hivemall.knn.distance.ManhattanDistanceUDF]]
* @group knn.distance
*/
@scala.annotation.varargs
def manhattan_distance(exprs: Column*): Column = withExpr {
- HiveGenericUDF("manhattan_distance",
- new HiveFunctionWrapper("hivemall.knn.distance.ManhattanDistanceUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.distance.ManhattanDistanceUDF",
+ "manhattan_distance",
+ exprs
+ )
}
/**
- * @see hivemall.knn.distance.MinkowskiDistanceUDF
+ * @see [[hivemall.knn.distance.MinkowskiDistanceUDF]]
* @group knn.distance
*/
@scala.annotation.varargs
def minkowski_distance (exprs: Column*): Column = withExpr {
- HiveGenericUDF("minkowski_distance",
- new HiveFunctionWrapper("hivemall.knn.distance.MinkowskiDistanceUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.distance.MinkowskiDistanceUDF",
+ "minkowski_distance",
+ exprs
+ )
}
/**
- * @see hivemall.knn.lsh.bBitMinHashUDF
+ * @see [[hivemall.knn.lsh.bBitMinHashUDF]]
* @group knn.lsh
*/
@scala.annotation.varargs
def bbit_minhash(exprs: Column*): Column = withExpr {
- HiveSimpleUDF("bbit_minhash",
- new HiveFunctionWrapper("hivemall.knn.lsh.bBitMinHashUDF"),
- exprs.map(_.expr))
+ planHiveUDF(
+ "hivemall.knn.lsh.bBitMinHashUDF",
+ "bbit_minhash",
+ exprs
+ )
}
/**
- * @see hivemall.knn.lsh.MinHashesUDF
+ * @see [[hivemall.knn.lsh.MinHashesUDFWrapper]]
* @group knn.lsh
*/
@scala.annotation.varargs
def minhashes(exprs: Column*): Column = withExpr {
- HiveGenericUDF("minhashes",
- new HiveFunctionWrapper("hivemall.knn.lsh.MinHashesUDFWrapper"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.knn.lsh.MinHashesUDFWrapper",
+ "minhashes",
+ exprs
+ )
}
/**
* Returns new features with `1.0` (bias) appended to the input features.
+ * @see [[hivemall.ftvec.AddBiasUDFWrapper]]
* @group ftvec
*/
def add_bias(expr: Column): Column = withExpr {
- HiveGenericUDF("add_bias",
- new HiveFunctionWrapper("hivemall.ftvec.AddBiasUDFWrapper"),
- expr.expr :: Nil)
+ planHiveGenericUDF(
+ "hivemall.ftvec.AddBiasUDFWrapper",
+ "add_bias",
+ expr :: Nil
+ )
}
/**
- * @see hivemall.ftvec.ExtractFeatureUdf
+ * @see [[hivemall.ftvec.ExtractFeatureUDFWrapper]]
* @group ftvec
*
* TODO: This throws java.lang.ClassCastException because
* HiveInspectors.toInspector has a bug in spark.
* Need to fix it later.
*/
- def extract_feature(expr: Column): Column = {
- val hiveUdf = HiveGenericUDF(
+ def extract_feature(expr: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.ExtractFeatureUDFWrapper",
"extract_feature",
- new HiveFunctionWrapper("hivemall.ftvec.ExtractFeatureUDFWrapper"),
- expr.expr :: Nil)
- Column(hiveUdf).as("feature")
- }
+ expr :: Nil
+ )
+ }.as("feature")
/**
- * @see hivemall.ftvec.ExtractWeightUdf
+ * @see [[hivemall.ftvec.ExtractWeightUDFWrapper]]
* @group ftvec
*
* TODO: This throws java.lang.ClassCastException because
* HiveInspectors.toInspector has a bug in spark.
* Need to fix it later.
*/
- def extract_weight(expr: Column): Column = {
- val hiveUdf = HiveGenericUDF(
+ def extract_weight(expr: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.ExtractWeightUDFWrapper",
"extract_weight",
- new HiveFunctionWrapper("hivemall.ftvec.ExtractWeightUDFWrapper"),
- expr.expr :: Nil)
- Column(hiveUdf).as("value")
- }
+ expr :: Nil
+ )
+ }.as("value")
/**
- * @see hivemall.ftvec.AddFeatureIndexUDFWrapper
+ * @see [[hivemall.ftvec.AddFeatureIndexUDFWrapper]]
* @group ftvec
*/
def add_feature_index(expr: Column): Column = withExpr {
- HiveGenericUDF("add_feature_index",
- new HiveFunctionWrapper("hivemall.ftvec.AddFeatureIndexUDFWrapper"),
- expr.expr :: Nil)
+ planHiveGenericUDF(
+ "hivemall.ftvec.AddFeatureIndexUDFWrapper",
+ "add_feature_index",
+ expr :: Nil
+ )
}
/**
- * @see hivemall.ftvec.SortByFeatureUDF
+ * @see [[hivemall.ftvec.SortByFeatureUDFWrapper]]
* @group ftvec
*/
def sort_by_feature(expr: Column): Column = withExpr {
- HiveGenericUDF("sort_by_feature",
- new HiveFunctionWrapper("hivemall.ftvec.SortByFeatureUDFWrapper"),
- expr.expr :: Nil)
+ planHiveGenericUDF(
+ "hivemall.ftvec.SortByFeatureUDFWrapper",
+ "sort_by_feature",
+ expr :: Nil
+ )
}
/**
- * @see hivemall.ftvec.hashing.MurmurHash3UDF
+ * @see [[hivemall.ftvec.hashing.MurmurHash3UDF]]
* @group ftvec.hashing
*/
def mhash(expr: Column): Column = withExpr {
- HiveSimpleUDF("mhash",
- new HiveFunctionWrapper("hivemall.ftvec.hashing.MurmurHash3UDF"),
- expr.expr :: Nil)
+ planHiveUDF(
+ "hivemall.ftvec.hashing.MurmurHash3UDF",
+ "mhash",
+ expr :: Nil
+ )
}
/**
- * @see hivemall.ftvec.hashing.Sha1UDF
+ * @see [[hivemall.ftvec.hashing.Sha1UDF]]
* @group ftvec.hashing
*/
def sha1(expr: Column): Column = withExpr {
- HiveSimpleUDF("sha1",
- new HiveFunctionWrapper("hivemall.ftvec.hashing.Sha1UDF"),
- expr.expr :: Nil)
+ planHiveUDF(
+ "hivemall.ftvec.hashing.Sha1UDF",
+ "sha1",
+ expr :: Nil
+ )
}
/**
- * @see hivemall.ftvec.hashing.ArrayHashValuesUDF
+ * @see [[hivemall.ftvec.hashing.ArrayHashValuesUDF]]
* @group ftvec.hashing
*/
@scala.annotation.varargs
def array_hash_values(exprs: Column*): Column = withExpr {
// TODO: Need a wrapper class because of using unsupported types
- HiveSimpleUDF("array_hash_values",
- new HiveFunctionWrapper("hivemall.ftvec.hashing.ArrayHashValuesUDF"),
- exprs.map(_.expr))
+ planHiveUDF(
+ "hivemall.ftvec.hashing.ArrayHashValuesUDF",
+ "array_hash_values",
+ exprs
+ )
}
/**
- * @see hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF
+ * @see [[hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF]]
* @group ftvec.hashing
*/
@scala.annotation.varargs
def prefixed_hash_values(exprs: Column*): Column = withExpr {
// TODO: Need a wrapper class because of using unsupported types
- HiveSimpleUDF("prefixed_hash_values",
- new HiveFunctionWrapper("hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF"),
- exprs.map(_.expr))
+ planHiveUDF(
+ "hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF",
+ "prefixed_hash_values",
+ exprs
+ )
}
/**
- * @see hivemall.ftvec.scaling.RescaleUDF
+ * @see [[hivemall.ftvec.scaling.RescaleUDF]]
* @group ftvec.scaling
*/
def rescale(value: Column, max: Column, min: Column): Column = withExpr {
- HiveSimpleUDF("rescale",
- new HiveFunctionWrapper("hivemall.ftvec.scaling.RescaleUDF"),
- (value.cast(FloatType) :: max :: min :: Nil).map(_.expr))
+ planHiveUDF(
+ "hivemall.ftvec.scaling.RescaleUDF",
+ "rescale",
+ value.cast(FloatType) :: max :: min :: Nil
+ )
}
/**
- * @see hivemall.ftvec.scaling.ZScoreUDF
+ * @see [[hivemall.ftvec.scaling.ZScoreUDF]]
* @group ftvec.scaling
*/
@scala.annotation.varargs
def zscore(exprs: Column*): Column = withExpr {
- HiveSimpleUDF("zscore",
- new HiveFunctionWrapper("hivemall.ftvec.scaling.ZScoreUDF"),
- exprs.map(_.expr))
+ planHiveUDF(
+ "hivemall.ftvec.scaling.ZScoreUDF",
+ "zscore",
+ exprs
+ )
}
/**
- * @see hivemall.ftvec.scaling.L2NormalizationUDF
+ * @see [[hivemall.ftvec.scaling.L2NormalizationUDFWrapper]]
* @group ftvec.scaling
*/
def normalize(expr: Column): Column = withExpr {
- HiveGenericUDF("normalize",
- new HiveFunctionWrapper("hivemall.ftvec.scaling.L2NormalizationUDFWrapper"),
- expr.expr :: Nil)
+ planHiveGenericUDF(
+ "hivemall.ftvec.scaling.L2NormalizationUDFWrapper",
+ "normalize",
+ expr :: Nil
+ )
}
/**
- * @see hivemall.ftvec.selection.ChiSquareUDF
+ * @see [[hivemall.ftvec.selection.ChiSquareUDF]]
* @group ftvec.selection
*/
def chi2(observed: Column, expected: Column): Column = withExpr {
- HiveGenericUDF("chi2",
- new HiveFunctionWrapper("hivemall.ftvec.selection.ChiSquareUDF"),
- Seq(observed.expr, expected.expr))
+ planHiveGenericUDF(
+ "hivemall.ftvec.selection.ChiSquareUDF",
+ "chi2",
+ Seq(observed, expected)
+ )
}
/**
- * @see hivemall.ftvec.conv.ToDenseFeaturesUDF
+ * @see [[hivemall.ftvec.conv.ToDenseFeaturesUDF]]
* @group ftvec.conv
*/
@scala.annotation.varargs
def to_dense_features(exprs: Column*): Column = withExpr {
// TODO: Need a wrapper class because of using unsupported types
- HiveGenericUDF("to_dense_features",
- new HiveFunctionWrapper("hivemall.ftvec.conv.ToDenseFeaturesUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.ftvec.conv.ToDenseFeaturesUDF",
+ "to_dense_features",
+ exprs
+ )
}
/**
- * @see hivemall.ftvec.conv.ToSparseFeaturesUDF
+ * @see [[hivemall.ftvec.conv.ToSparseFeaturesUDF]]
* @group ftvec.conv
*/
@scala.annotation.varargs
def to_sparse_features(exprs: Column*): Column = withExpr {
// TODO: Need a wrapper class because of using unsupported types
- HiveGenericUDF("to_sparse_features",
- new HiveFunctionWrapper("hivemall.ftvec.conv.ToSparseFeaturesUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.ftvec.conv.ToSparseFeaturesUDF",
+ "to_sparse_features",
+ exprs
+ )
}
/**
- * @see hivemall.ftvec.trans.VectorizeFeaturesUDF
+ * @see [[hivemall.ftvec.trans.VectorizeFeaturesUDF]]
* @group ftvec.trans
*/
@scala.annotation.varargs
def vectorize_features(exprs: Column*): Column = withExpr {
- HiveGenericUDF("vectorize_features",
- new HiveFunctionWrapper("hivemall.ftvec.trans.VectorizeFeaturesUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.ftvec.trans.VectorizeFeaturesUDF",
+ "vectorize_features",
+ exprs
+ )
}
/**
- * @see hivemall.ftvec.trans.CategoricalFeaturesUDF
+ * @see [[hivemall.ftvec.trans.CategoricalFeaturesUDF]]
* @group ftvec.trans
*/
@scala.annotation.varargs
def categorical_features(exprs: Column*): Column = withExpr {
- HiveGenericUDF("categorical_features",
- new HiveFunctionWrapper("hivemall.ftvec.trans.CategoricalFeaturesUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.ftvec.trans.CategoricalFeaturesUDF",
+ "categorical_features",
+ exprs
+ )
}
/**
- * @see hivemall.ftvec.trans.IndexedFeatures
+ * @see [[hivemall.ftvec.trans.IndexedFeatures]]
* @group ftvec.trans
*/
@scala.annotation.varargs
def indexed_features(exprs: Column*): Column = withExpr {
- HiveGenericUDF("indexed_features",
- new HiveFunctionWrapper("hivemall.ftvec.trans.IndexedFeatures"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.ftvec.trans.IndexedFeatures",
+ "indexed_features",
+ exprs
+ )
}
/**
- * @see hivemall.ftvec.trans.QuantitativeFeaturesUDF
+ * @see [[hivemall.ftvec.trans.QuantitativeFeaturesUDF]]
* @group ftvec.trans
*/
@scala.annotation.varargs
def quantitative_features(exprs: Column*): Column = withExpr {
- HiveGenericUDF("quantitative_features",
- new HiveFunctionWrapper("hivemall.ftvec.trans.QuantitativeFeaturesUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.ftvec.trans.QuantitativeFeaturesUDF",
+ "quantitative_features",
+ exprs
+ )
}
/**
- * @see hivemall.smile.tools.TreePredictUDF
+ * @see [[hivemall.smile.tools.TreePredictUDF]]
* @group misc
*/
@scala.annotation.varargs
def tree_predict(exprs: Column*): Column = withExpr {
- HiveGenericUDF("tree_predict",
- new HiveFunctionWrapper("hivemall.smile.tools.TreePredictUDF"),
- exprs.map(_.expr))
+ planHiveGenericUDF(
+ "hivemall.smile.tools.TreePredictUDF",
+ "tree_predict",
+ exprs
+ )
}
/**
- * @see hivemall.tools.array.SelectKBestUDF
+ * @see [[hivemall.tools.array.SelectKBestUDF]]
* @group tools.array
*/
def select_k_best(X: Column, importanceList: Column, k: Column): Column = withExpr {
- HiveGenericUDF("select_k_best",
- new HiveFunctionWrapper("hivemall.tools.array.SelectKBestUDF"),
- Seq(X.expr, importanceList.expr, k.expr))
+ planHiveGenericUDF(
+ "hivemall.tools.array.SelectKBestUDF",
+ "select_k_best",
+ Seq(X, importanceList, k)
+ )
}
/**
- * @see hivemall.tools.math.SigmoidUDF
+ * @see [[hivemall.tools.math.SigmoidGenericUDF]]
* @group misc
*/
def sigmoid(expr: Column): Column = {
@@ -1313,11 +1336,15 @@ object HivemallOps {
}
/**
- * @see hivemall.tools.mapred.RowIdUDF
+ * @see [[hivemall.tools.mapred.RowIdUDFWrapper]]
* @group misc
*/
def rowid(): Column = withExpr {
- HiveGenericUDF("rowid", new HiveFunctionWrapper("hivemall.tools.mapred.RowIdUDFWrapper"), Nil)
+ planHiveGenericUDF(
+ "hivemall.tools.mapred.RowIdUDFWrapper",
+ "rowid",
+ Nil
+ )
}.as("rowid")
/**
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala
new file mode 100644
index 0000000..ab5c5fb
--- /dev/null
+++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.hive.internal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan}
+import org.apache.spark.sql.hive._
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
+
+/**
+ * This is an implementation class for [[org.apache.spark.sql.hive.HivemallOps]].
+ * This class mainly uses the internal Spark classes (e.g., `Generate` and `HiveGenericUDTF`) that
+ * have unstable interfaces (so, these interfaces may evolve in upcoming releases).
+ * Therefore, the objective of this class is to extract these unstable parts
+ * from [[org.apache.spark.sql.hive.HivemallOps]].
+ */
+private[hive] object HivemallOpsImpl extends Logging {
+
+ def planHiveUDF(
+ className: String,
+ funcName: String,
+ argumentExprs: Seq[Column]): Expression = {
+ HiveSimpleUDF(
+ name = funcName,
+ funcWrapper = new HiveFunctionWrapper(className),
+ children = argumentExprs.map(_.expr)
+ )
+ }
+
+ def planHiveGenericUDF(
+ className: String,
+ funcName: String,
+ argumentExprs: Seq[Column]): Expression = {
+ HiveGenericUDF(
+ name = funcName,
+ funcWrapper = new HiveFunctionWrapper(className),
+ children = argumentExprs.map(_.expr)
+ )
+ }
+
+ def planHiveGenericUDTF(
+ df: DataFrame,
+ className: String,
+ funcName: String,
+ argumentExprs: Seq[Column],
+ outputAttrNames: Seq[String]): LogicalPlan = {
+ Generate(
+ generator = HiveGenericUDTF(
+ name = funcName,
+ funcWrapper = new HiveFunctionWrapper(className),
+ children = argumentExprs.map(_.expr)
+ ),
+ join = false,
+ outer = false,
+ qualifier = None,
+ generatorOutput = outputAttrNames.map(UnresolvedAttribute(_)),
+ child = df.logicalPlan)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
index 49773cc..493feda 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
@@ -357,7 +357,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
testData("features"), lit(true)).as("predicted")
)
.groupBy($"rowid")
- .rf_ensemble("predicted").as("rowid", "predicted")
+ .rf_ensemble("predicted").toDF("rowid", "predicted")
.select($"predicted.label")
checkAnswer(predicted, Seq(Row(0), Row(1)))
@@ -447,7 +447,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
val testDf = Seq(
(Array(0.3, 0.1, 0.2), 1),
(Array(0.3, 0.1, 0.2), 0),
- (Array(0.3, 0.1, 0.2), 0)).toDF.as("features", "label")
+ (Array(0.3, 0.1, 0.2), 0)).toDF("features", "label")
Seq(
"train_randomforest_regr",
"train_randomforest_classifier"
@@ -469,7 +469,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
} else {
res.groupBy("feature").argmin_kld("weight", "conv")
}
- }.as("feature", "weight")
+ }.toDF("feature", "weight")
// Data preparation
val testDf = LargeRegrTrainData
@@ -485,14 +485,14 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
.join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
.select($"rowid", ($"weight" * $"value").as("value"))
.groupBy("rowid").sum("value")
- .as("rowid", "predicted")
+ .toDF("rowid", "predicted")
// Evaluation
val eval = predict
.join(testDf, predict("rowid") === testDf("rowid"))
.groupBy()
.agg(Map("target" -> "avg", "predicted" -> "avg"))
- .as("target", "predicted")
+ .toDF("target", "predicted")
val diff = eval.map {
case Row(target: Double, predicted: Double) =>
@@ -514,7 +514,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
} else {
res.groupBy("feature").argmin_kld("weight", "conv")
}
- }.as("feature", "weight")
+ }.toDF("feature", "weight")
// Data preparation
val testDf = LargeClassifierTestData
@@ -537,7 +537,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
* Perhaps you need to use aliases.
*/
.select($"rowid", when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0))
- .as("rowid", "predicted")
+ .toDF("rowid", "predicted")
// Evaluation
val eval = predict
@@ -586,31 +586,32 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
test("user-defined aggregators for ensembles") {
import hiveContext.implicits._
- val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF.as("c0", "c1")
+ val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF("c0", "c1")
val row1 = df1.groupBy($"c0").voted_avg("c1").collect
assert(row1(0).getDouble(1) ~== 0.15)
assert(row1(1).getDouble(1) ~== 0.10)
- val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF.as("c0", "c1")
+ val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF("c0", "c1")
val row3 = df3.groupBy($"c0").weight_voted_avg("c1").collect
assert(row3(0).getDouble(1) ~== 0.50)
assert(row3(1).getDouble(1) ~== 0.30)
- val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF.as("c0", "c1", "c2")
+ val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF("c0", "c1", "c2")
val row5 = df5.groupBy($"c0").argmin_kld("c1", "c2").collect
assert(row5(0).getFloat(1) ~== 0.266666666)
assert(row5(1).getFloat(1) ~== 0.80)
- val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF.as("c0", "c1", "c2")
+ val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF("c0", "c1", "c2")
val row6 = df6.groupBy($"c0").max_label("c2", "c1").collect
assert(row6(0).getString(1) == "id-1")
- val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF.as("c0", "c1", "c2")
- val row7 = df7.groupBy($"c0").maxrow("c2", "c1").as("c0", "c1").select($"c1.col1").collect
+ val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF("c0", "c1", "c2")
+ val row7 = df7.groupBy($"c0").maxrow("c2", "c1").toDF("c0", "c1").select($"c1.col1").collect
assert(row7(0).getString(0) == "id-0")
- val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF.as("c0", "c1")
- val row8 = df8.groupBy($"c0").rf_ensemble("c1").as("c0", "c1").select("c1.probability").collect
+ val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF("c0", "c1")
+ val row8 = df8.groupBy($"c0").rf_ensemble("c1").toDF("c0", "c1")
+ .select("c1.probability").collect
assert(row8(0).getDouble(0) ~== 0.3333333333)
assert(row8(1).getDouble(0) ~== 1.0)
}
@@ -618,20 +619,20 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
test("user-defined aggregators for evaluation") {
import hiveContext.implicits._
- val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF.as("c0", "c1", "c2")
+ val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF("c0", "c1", "c2")
val row1 = df1.groupBy($"c0").mae("c1", "c2").collect
assert(row1(0).getDouble(1) ~== 0.26666666)
- val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2")
+ val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF("c0", "c1", "c2")
val row2 = df2.groupBy($"c0").mse("c1", "c2").collect
assert(row2(0).getDouble(1) ~== 0.29999999)
- val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2")
+ val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF("c0", "c1", "c2")
val row3 = df3.groupBy($"c0").rmse("c1", "c2").collect
assert(row3(0).getDouble(1) ~== 0.54772253)
val df4 = Seq((1, Array(1, 2), Array(2, 3)), (1, Array(3, 8), Array(5, 4))).toDF
- .as("c0", "c1", "c2")
+ .toDF("c0", "c1", "c2")
val row4 = df4.groupBy($"c0").f1score("c1", "c2").collect
assert(row4(0).getDouble(1) ~== 0.25)
}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
index 3b87a96..06a4dc0 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
@@ -170,7 +170,7 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
} else {
res.groupBy("feature").argmin_kld("weight", "conv")
}
- }.as("feature", "weight")
+ }.toDF("feature", "weight")
// Data preparation
val testDf = testA9aData
@@ -186,14 +186,14 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
.join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
.select($"rowid", ($"weight" * $"value").as("value"))
.groupBy("rowid").sum("value")
- .as("rowid", "predicted")
+ .toDF("rowid", "predicted")
// Evaluation
val eval = predict
.join(testDf, predict("rowid") === testDf("rowid"))
.groupBy()
.agg(Map("target" -> "avg", "predicted" -> "avg"))
- .as("target", "predicted")
+ .toDF("target", "predicted")
val (target, predicted) = eval.map {
case Row(target: Double, predicted: Double) => (target, predicted)
@@ -238,7 +238,7 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
} else {
res.groupBy("feature").argmin_kld("weight", "conv")
}
- }.as("feature", "weight")
+ }.toDF("feature", "weight")
// Data preparation
val testDf = testKdd2010aData
@@ -255,7 +255,7 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
.select($"rowid", ($"weight" * $"value").as("value"))
.groupBy("rowid").sum("value")
.select($"rowid", when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0))
- .as("rowid", "predicted")
+ .toDF("rowid", "predicted")
// Evaluation
val eval = predict
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
index 8c9c0c3..37e0989 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
@@ -75,7 +75,7 @@ final class XGBoostSuite extends VectorQueryTest {
val predict = model.join(mllibTestDf)
.xgboost_predict($"rowid", $"features", $"model_id", $"pred_model")
.groupBy("rowid").avg()
- .as("rowid", "predicted")
+ .toDF("rowid", "predicted")
val result = predict.join(mllibTestDf, predict("rowid") === mllibTestDf("rowid"), "INNER")
.select(predict("rowid"), $"predicted", $"label")
@@ -100,7 +100,7 @@ final class XGBoostSuite extends VectorQueryTest {
val predict = model.join(mllibTestDf)
.xgboost_predict($"rowid", $"features", $"model_id", $"pred_model")
.groupBy("rowid").avg()
- .as("rowid", "predicted")
+ .toDF("rowid", "predicted")
val result = predict.join(mllibTestDf, predict("rowid") === mllibTestDf("rowid"), "INNER")
.select(
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/88fae520/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
index 598479d..b15c77c 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
@@ -143,7 +143,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
testDf.join(testModel, testDf("feature") === testModel("feature"), "LEFT_OUTER")
.select($"rowid", ($"weight" * $"value").as("value"))
.groupBy("rowid").sum("value")
- .as("rowid", "value")
+ .toDF("rowid", "value")
.select($"rowid", sigmoid($"value"))
}