You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by my...@apache.org on 2017/09/13 12:18:27 UTC
[3/4] incubator-hivemall git commit: Close #122:
[HIVEMALL-133][SPARK] Support spark-v2.2 in the hivemalls-spark module
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala
new file mode 100644
index 0000000..75f4bfc
--- /dev/null
+++ b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.RelationalGroupedDataset
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.Aggregate
+import org.apache.spark.sql.catalyst.plans.logical.Pivot
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
+import org.apache.spark.sql.types._
+
+/**
+ * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
+ *
+ * @groupname ensemble
+ * @groupname ftvec.trans
+ * @groupname evaluation
+ */
+final class HivemallGroupedDataset(groupBy: RelationalGroupedDataset) {
+
+ /**
+ * @see hivemall.ensemble.bagging.VotedAvgUDAF
+ * @group ensemble
+ */
+ def voted_avg(weight: String): DataFrame = {
+ // checkType(weight, NumericType)
+ val udaf = HiveUDAFFunction(
+ "voted_avg",
+ new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
+ Seq(weight).map(df.col(_).expr),
+ isUDAFBridgeRequired = true)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * @see hivemall.ensemble.bagging.WeightVotedAvgUDAF
+ * @group ensemble
+ */
+ def weight_voted_avg(weight: String): DataFrame = {
+ // checkType(weight, NumericType)
+ val udaf = HiveUDAFFunction(
+ "weight_voted_avg",
+ new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
+ Seq(weight).map(df.col(_).expr),
+ isUDAFBridgeRequired = true)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * @see hivemall.ensemble.ArgminKLDistanceUDAF
+ * @group ensemble
+ */
+ def argmin_kld(weight: String, conv: String): DataFrame = {
+ // checkType(weight, NumericType)
+ // checkType(conv, NumericType)
+ val udaf = HiveUDAFFunction(
+ "argmin_kld",
+ new HiveFunctionWrapper("hivemall.ensemble.ArgminKLDistanceUDAF"),
+ Seq(weight, conv).map(df.col(_).expr),
+ isUDAFBridgeRequired = true)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * @see hivemall.ensemble.MaxValueLabelUDAF"
+ * @group ensemble
+ */
+ def max_label(score: String, label: String): DataFrame = {
+ // checkType(score, NumericType)
+ checkType(label, StringType)
+ val udaf = HiveUDAFFunction(
+ "max_label",
+ new HiveFunctionWrapper("hivemall.ensemble.MaxValueLabelUDAF"),
+ Seq(score, label).map(df.col(_).expr),
+ isUDAFBridgeRequired = true)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * @see hivemall.ensemble.MaxRowUDAF
+ * @group ensemble
+ */
+ def maxrow(score: String, label: String): DataFrame = {
+ // checkType(score, NumericType)
+ checkType(label, StringType)
+ val udaf = HiveUDAFFunction(
+ "maxrow",
+ new HiveFunctionWrapper("hivemall.ensemble.MaxRowUDAF"),
+ Seq(score, label).map(df.col(_).expr),
+ isUDAFBridgeRequired = false)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * @see hivemall.smile.tools.RandomForestEnsembleUDAF
+ * @group ensemble
+ */
+ def rf_ensemble(predict: String): DataFrame = {
+ // checkType(predict, NumericType)
+ val udaf = HiveUDAFFunction(
+ "rf_ensemble",
+ new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"),
+ Seq(predict).map(df.col(_).expr),
+ isUDAFBridgeRequired = false)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * @see hivemall.tools.matrix.TransposeAndDotUDAF
+ */
+ def transpose_and_dot(X: String, Y: String): DataFrame = {
+ val udaf = HiveUDAFFunction(
+ "transpose_and_dot",
+ new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"),
+ Seq(X, Y).map(df.col(_).expr),
+ isUDAFBridgeRequired = false)
+ .toAggregateExpression()
+ toDF(Seq(Alias(udaf, udaf.prettyName)()))
+ }
+
+ /**
+ * @see hivemall.ftvec.trans.OnehotEncodingUDAF
+ * @group ftvec.trans
+ */
+ def onehot_encoding(cols: String*): DataFrame = {
+ val udaf = HiveUDAFFunction(
+ "onehot_encoding",
+ new HiveFunctionWrapper("hivemall.ftvec.trans.OnehotEncodingUDAF"),
+ cols.map(df.col(_).expr),
+ isUDAFBridgeRequired = false)
+ .toAggregateExpression()
+ toDF(Seq(Alias(udaf, udaf.prettyName)()))
+ }
+
+ /**
+ * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF
+ */
+ def snr(X: String, Y: String): DataFrame = {
+ val udaf = HiveUDAFFunction(
+ "snr",
+ new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"),
+ Seq(X, Y).map(df.col(_).expr),
+ isUDAFBridgeRequired = false)
+ .toAggregateExpression()
+ toDF(Seq(Alias(udaf, udaf.prettyName)()))
+ }
+
+ /**
+ * @see hivemall.evaluation.MeanAbsoluteErrorUDAF
+ * @group evaluation
+ */
+ def mae(predict: String, target: String): DataFrame = {
+ checkType(predict, FloatType)
+ checkType(target, FloatType)
+ val udaf = HiveUDAFFunction(
+ "mae",
+ new HiveFunctionWrapper("hivemall.evaluation.MeanAbsoluteErrorUDAF"),
+ Seq(predict, target).map(df.col(_).expr),
+ isUDAFBridgeRequired = true)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * @see hivemall.evaluation.MeanSquareErrorUDAF
+ * @group evaluation
+ */
+ def mse(predict: String, target: String): DataFrame = {
+ checkType(predict, FloatType)
+ checkType(target, FloatType)
+ val udaf = HiveUDAFFunction(
+ "mse",
+ new HiveFunctionWrapper("hivemall.evaluation.MeanSquaredErrorUDAF"),
+ Seq(predict, target).map(df.col(_).expr),
+ isUDAFBridgeRequired = true)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * @see hivemall.evaluation.RootMeanSquareErrorUDAF
+ * @group evaluation
+ */
+ def rmse(predict: String, target: String): DataFrame = {
+ checkType(predict, FloatType)
+ checkType(target, FloatType)
+ val udaf = HiveUDAFFunction(
+ "rmse",
+ new HiveFunctionWrapper("hivemall.evaluation.RootMeanSquaredErrorUDAF"),
+ Seq(predict, target).map(df.col(_).expr),
+ isUDAFBridgeRequired = true)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * @see hivemall.evaluation.FMeasureUDAF
+ * @group evaluation
+ */
+ def f1score(predict: String, target: String): DataFrame = {
+ // checkType(target, ArrayType(IntegerType))
+ // checkType(predict, ArrayType(IntegerType))
+ val udaf = HiveUDAFFunction(
+ "f1score",
+ new HiveFunctionWrapper("hivemall.evaluation.FMeasureUDAF"),
+ Seq(predict, target).map(df.col(_).expr),
+ isUDAFBridgeRequired = true)
+ .toAggregateExpression()
+ toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq)
+ }
+
+ /**
+ * [[RelationalGroupedDataset]] has the three values as private fields, so, to inject Hivemall
+ * aggregate functions, we fetch them via Java Reflections.
+ */
+ private val df = getPrivateField[DataFrame]("org$apache$spark$sql$RelationalGroupedDataset$$df")
+ private val groupingExprs = getPrivateField[Seq[Expression]]("groupingExprs")
+ private val groupType = getPrivateField[RelationalGroupedDataset.GroupType]("groupType")
+
+ private def getPrivateField[T](name: String): T = {
+ val field = groupBy.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.get(groupBy).asInstanceOf[T]
+ }
+
+ private def toDF(aggExprs: Seq[Expression]): DataFrame = {
+ val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
+ groupingExprs ++ aggExprs
+ } else {
+ aggExprs
+ }
+
+ val aliasedAgg = aggregates.map(alias)
+
+ groupType match {
+ case RelationalGroupedDataset.GroupByType =>
+ Dataset.ofRows(
+ df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
+ case RelationalGroupedDataset.RollupType =>
+ Dataset.ofRows(
+ df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
+ case RelationalGroupedDataset.CubeType =>
+ Dataset.ofRows(
+ df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
+ case RelationalGroupedDataset.PivotType(pivotCol, values) =>
+ val aliasedGrps = groupingExprs.map(alias)
+ Dataset.ofRows(
+ df.sparkSession, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
+ }
+ }
+
+ private def alias(expr: Expression): NamedExpression = expr match {
+ case u: UnresolvedAttribute => UnresolvedAlias(u)
+ case expr: NamedExpression => expr
+ case expr: Expression => Alias(expr, expr.prettyName)()
+ }
+
+ private def checkType(colName: String, expected: DataType) = {
+ val dataType = df.resolve(colName).dataType
+ if (dataType != expected) {
+ throw new AnalysisException(
+ s""""$colName" must be $expected, however it is $dataType""")
+ }
+ }
+}
+
+object HivemallGroupedDataset {
+
+ /**
+ * Implicitly inject the [[HivemallGroupedDataset]] into [[RelationalGroupedDataset]].
+ */
+ implicit def relationalGroupedDatasetToHivemallOne(
+ groupBy: RelationalGroupedDataset): HivemallGroupedDataset = {
+ new HivemallGroupedDataset(groupBy)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
new file mode 100644
index 0000000..22d3153
--- /dev/null
+++ b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
@@ -0,0 +1,1538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util.UUID
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.feature.HivemallFeature
+import org.apache.spark.ml.linalg.{DenseVector, SparseVector, VectorUDT}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, JoinTopK, LogicalPlan}
+import org.apache.spark.sql.execution.UserProvidedPlanner
+import org.apache.spark.sql.execution.datasources.csv.{CsvToStruct, StructToCsv}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Hivemall wrapper and some utility functions for DataFrame.
+ *
+ * @groupname regression
+ * @groupname classifier
+ * @groupname classifier.multiclass
+ * @groupname xgboost
+ * @groupname anomaly
+ * @groupname knn.similarity
+ * @groupname knn.distance
+ * @groupname knn.lsh
+ * @groupname ftvec
+ * @groupname ftvec.amplify
+ * @groupname ftvec.hashing
+ * @groupname ftvec.scaling
+ * @groupname ftvec.conv
+ * @groupname ftvec.trans
+ * @groupname misc
+ */
+final class HivemallOps(df: DataFrame) extends Logging {
+ import internal.HivemallOpsImpl._
+
+ private[this] lazy val _sparkSession = df.sparkSession
+ private[this] lazy val _analyzer = _sparkSession.sessionState.analyzer
+ private[this] lazy val _strategy = new UserProvidedPlanner(_sparkSession.sqlContext.conf)
+
+ /**
+ * @see [[hivemall.regression.AdaDeltaUDTF]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_adadelta(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AdaDeltaUDTF",
+ "train_adadelta",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.regression.AdaGradUDTF]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_adagrad(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AdaGradUDTF",
+ "train_adagrad",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.regression.AROWRegressionUDTF]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_arow_regr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AROWRegressionUDTF",
+ "train_arow_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.regression.AROWRegressionUDTF.AROWe]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_arowe_regr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AROWRegressionUDTF$AROWe",
+ "train_arowe_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.regression.AROWRegressionUDTF.AROWe2]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_arowe2_regr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.AROWRegressionUDTF$AROWe2",
+ "train_arowe2_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.regression.LogressUDTF]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_logregr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.LogressUDTF",
+ "train_logregr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_pa1_regr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.PassiveAggressiveRegressionUDTF",
+ "train_pa1_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_pa1a_regr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a",
+ "train_pa1a_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_pa2_regr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2",
+ "train_pa2_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_pa2a_regr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a",
+ "train_pa2a_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.smile.regression.RandomForestRegressionUDTF]]
+ * @group regression
+ */
+ @scala.annotation.varargs
+ def train_randomforest_regr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.smile.regression.RandomForestRegressionUDTF",
+ "train_randomforest_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.PerceptronUDTF]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_perceptron(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.PerceptronUDTF",
+ "train_perceptron",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.PassiveAggressiveUDTF]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_pa(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.PassiveAggressiveUDTF",
+ "train_pa",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA1]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_pa1(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.PassiveAggressiveUDTF$PA1",
+ "train_pa1",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA2]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_pa2(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.PassiveAggressiveUDTF$PA2",
+ "train_pa2",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.ConfidenceWeightedUDTF]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_cw(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.ConfidenceWeightedUDTF",
+ "train_cw",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.AROWClassifierUDTF]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_arow(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.AROWClassifierUDTF",
+ "train_arow",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.AROWClassifierUDTF.AROWh]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_arowh(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.AROWClassifierUDTF$AROWh",
+ "train_arowh",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_scw(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.SoftConfideceWeightedUDTF$SCW1",
+ "train_scw",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_scw2(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.SoftConfideceWeightedUDTF$SCW2",
+ "train_scw2",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.AdaGradRDAUDTF]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_adagrad_rda(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.AdaGradRDAUDTF",
+ "train_adagrad_rda",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.smile.classification.RandomForestClassifierUDTF]]
+ * @group classifier
+ */
+ @scala.annotation.varargs
+ def train_randomforest_classifier(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.smile.classification.RandomForestClassifierUDTF",
+ "train_randomforest_classifier",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.multiclass.MulticlassPerceptronUDTF]]
+ * @group classifier.multiclass
+ */
+ @scala.annotation.varargs
+ def train_multiclass_perceptron(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassPerceptronUDTF",
+ "train_multiclass_perceptron",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF]]
+ * @group classifier.multiclass
+ */
+ @scala.annotation.varargs
+ def train_multiclass_pa(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF",
+ "train_multiclass_pa",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA1]]
+ * @group classifier.multiclass
+ */
+ @scala.annotation.varargs
+ def train_multiclass_pa1(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1",
+ "train_multiclass_pa1",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA2]]
+ * @group classifier.multiclass
+ */
+ @scala.annotation.varargs
+ def train_multiclass_pa2(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2",
+ "train_multiclass_pa2",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF]]
+ * @group classifier.multiclass
+ */
+ @scala.annotation.varargs
+ def train_multiclass_cw(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF",
+ "train_multiclass_cw",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF]]
+ * @group classifier.multiclass
+ */
+ @scala.annotation.varargs
+ def train_multiclass_arow(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF",
+ "train_multiclass_arow",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW1]]
+ * @group classifier.multiclass
+ */
+ @scala.annotation.varargs
+ def train_multiclass_scw(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1",
+ "train_multiclass_scw",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW2]]
+ * @group classifier.multiclass
+ */
+ @scala.annotation.varargs
+ def train_multiclass_scw2(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2",
+ "train_multiclass_scw2",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "feature", "weight", "conv")
+ )
+ }
+
+ /**
+ * :: Experimental ::
+ * @see [[hivemall.xgboost.regression.XGBoostRegressionUDTF]]
+ * @group xgboost
+ */
+ @Experimental
+ @scala.annotation.varargs
+ def train_xgboost_regr(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.regression.XGBoostRegressionUDTF",
+ "train_xgboost_regr",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("model_id", "pred_model")
+ )
+ }
+
+ /**
+ * :: Experimental ::
+ * @see [[hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF]]
+ * @group xgboost
+ */
+ @Experimental
+ @scala.annotation.varargs
+ def train_xgboost_classifier(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF",
+ "train_xgboost_classifier",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("model_id", "pred_model")
+ )
+ }
+
+ /**
+ * :: Experimental ::
+ * @see [[hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF]]
+ * @group xgboost
+ */
+ @Experimental
+ @scala.annotation.varargs
+ def train_xgboost_multiclass_classifier(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF",
+ "train_xgboost_multiclass_classifier",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("model_id", "pred_model")
+ )
+ }
+
+ /**
+ * :: Experimental ::
+ * @see [[hivemall.xgboost.tools.XGBoostPredictUDTF]]
+ * @group xgboost
+ */
+ @Experimental
+ @scala.annotation.varargs
+ def xgboost_predict(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.tools.XGBoostPredictUDTF",
+ "xgboost_predict",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("rowid", "predicted")
+ )
+ }
+
+ /**
+ * :: Experimental ::
+ * @see [[hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF]]
+ * @group xgboost
+ */
+ @Experimental
+ @scala.annotation.varargs
+ def xgboost_multiclass_predict(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF",
+ "xgboost_multiclass_predict",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("rowid", "label", "probability")
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.lsh.MinHashUDTF]]
+ * @group knn.lsh
+ */
+ @scala.annotation.varargs
+ def minhash(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.knn.lsh.MinHashUDTF",
+ "minhash",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("clusterid", "item")
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.amplify.AmplifierUDTF]]
+ * @group ftvec.amplify
+ */
+ @scala.annotation.varargs
+ def amplify(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.ftvec.amplify.AmplifierUDTF",
+ "amplify",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("clusterid", "item")
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.amplify.RandomAmplifierUDTF]]
+ * @group ftvec.amplify
+ */
+ @scala.annotation.varargs
+ def rand_amplify(exprs: Column*): DataFrame = withTypedPlan {
+ throw new UnsupportedOperationException("`rand_amplify` not supported yet")
+ }
+
+ /**
+ * Amplifies and shuffle data inside partitions.
+ * @group ftvec.amplify
+ */
+ def part_amplify(xtimes: Column): DataFrame = {
+ val xtimesInt = xtimes.expr match {
+ case Literal(v: Any, IntegerType) => v.asInstanceOf[Int]
+ case e => throw new AnalysisException("`xtimes` must be integer, however " + e)
+ }
+ val rdd = df.rdd.mapPartitions({ iter =>
+ val elems = iter.flatMap{ row =>
+ Seq.fill[Row](xtimesInt)(row)
+ }
+ // Need to check how this shuffling affects results
+ scala.util.Random.shuffle(elems)
+ }, true)
+ df.sqlContext.createDataFrame(rdd, df.schema)
+ }
+
+ /**
+ * Quantifies input columns.
+ * @see [[hivemall.ftvec.conv.QuantifyColumnsUDTF]]
+ * @group ftvec.conv
+ */
+ @scala.annotation.varargs
+ def quantify(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.ftvec.conv.QuantifyColumnsUDTF",
+ "quantify",
+ setMixServs(toHivemallFeatures(exprs)),
+ (0 until exprs.size - 1).map(i => s"c$i")
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.trans.BinarizeLabelUDTF]]
+ * @group ftvec.trans
+ */
+ @scala.annotation.varargs
+ def binarize_label(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.ftvec.trans.BinarizeLabelUDTF",
+ "binarize_label",
+ setMixServs(toHivemallFeatures(exprs)),
+ (0 until exprs.size - 1).map(i => s"c$i")
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.trans.QuantifiedFeaturesUDTF]]
+ * @group ftvec.trans
+ */
+ @scala.annotation.varargs
+ def quantified_features(exprs: Column*): DataFrame = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.ftvec.trans.QuantifiedFeaturesUDTF",
+ "quantified_features",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("features")
+ )
+ }
+
+ /**
+ * Splits Seq[String] into pieces.
+ * @group ftvec
+ */
+ def explode_array(expr: Column): DataFrame = {
+ df.explode(expr) { case Row(v: Seq[_]) =>
+ // Type erasure removes the component type in Seq
+ v.map(s => HivemallFeature(s.asInstanceOf[String]))
+ }
+ }
+
+ /**
+ * Splits [[Vector]] into pieces.
+ * @group ftvec
+ */
+ def explode_vector(expr: Column): DataFrame = {
+ val elementSchema = StructType(
+ StructField("feature", StringType) :: StructField("weight", DoubleType) :: Nil)
+ val explodeFunc: Row => TraversableOnce[InternalRow] = (row: Row) => {
+ row.get(0) match {
+ case dv: DenseVector =>
+ dv.values.zipWithIndex.map {
+ case (value, index) =>
+ InternalRow(UTF8String.fromString(s"$index"), value)
+ }
+ case sv: SparseVector =>
+ sv.values.zip(sv.indices).map {
+ case (value, index) =>
+ InternalRow(UTF8String.fromString(s"$index"), value)
+ }
+ }
+ }
+ withTypedPlan {
+ Generate(
+ UserDefinedGenerator(elementSchema, explodeFunc, expr.expr :: Nil),
+ join = true, outer = false, None,
+ generatorOutput = Nil,
+ df.logicalPlan)
+ }
+ }
+
+ /**
+ * Returns `top-k` records for each `group`.
+ * @group misc
+ */
+ def each_top_k(k: Column, score: Column, group: Column*): DataFrame = withTypedPlan {
+ val kInt = k.expr match {
+ case Literal(v: Any, IntegerType) => v.asInstanceOf[Int]
+ case e => throw new AnalysisException("`k` must be integer, however " + e)
+ }
+ if (kInt == 0) {
+ throw new AnalysisException("`k` must not have 0")
+ }
+ val clusterDf = df.repartition(group: _*).sortWithinPartitions(group: _*)
+ .select(score, Column("*"))
+ val analyzedPlan = clusterDf.queryExecution.analyzed
+ val inputAttrs = analyzedPlan.output
+ val scoreExpr = BindReferences.bindReference(analyzedPlan.expressions.head, inputAttrs)
+ val groupNames = group.map { _.expr match {
+ case ne: NamedExpression => ne.name
+ case ua: UnresolvedAttribute => ua.name
+ }}
+ val groupExprs = analyzedPlan.expressions.filter {
+ case ne: NamedExpression => groupNames.contains(ne.name)
+ }.map { e =>
+ BindReferences.bindReference(e, inputAttrs)
+ }
+ val rankField = StructField("rank", IntegerType)
+ Generate(
+ generator = EachTopK(
+ k = kInt,
+ scoreExpr = scoreExpr,
+ groupExprs = groupExprs,
+ elementSchema = StructType(
+ rankField +: inputAttrs.map(d => StructField(d.name, d.dataType))
+ ),
+ children = inputAttrs
+ ),
+ join = false,
+ outer = false,
+ qualifier = None,
+ generatorOutput = Seq(rankField.name).map(UnresolvedAttribute(_)) ++ inputAttrs,
+ child = analyzedPlan
+ )
+ }
+
+ /**
+ * :: Experimental ::
+ * Joins input two tables with the given keys and the top-k highest `score` values.
+ * @group misc
+ */
+ @Experimental
+ def top_k_join(k: Column, right: DataFrame, joinExprs: Column, score: Column)
+ : DataFrame = withTypedPlanInCustomStrategy {
+ val kInt = k.expr match {
+ case Literal(v: Any, IntegerType) => v.asInstanceOf[Int]
+ case e => throw new AnalysisException("`k` must be integer, however " + e)
+ }
+ if (kInt == 0) {
+ throw new AnalysisException("`k` must not have 0")
+ }
+ JoinTopK(kInt, df.logicalPlan, right.logicalPlan, Inner, Option(joinExprs.expr))(score.named)
+ }
+
+ private def doFlatten(schema: StructType, separator: Char, prefixParts: Seq[String] = Seq.empty)
+ : Seq[Column] = {
+ schema.fields.flatMap { f =>
+ val colNameParts = prefixParts :+ f.name
+ f.dataType match {
+ case st: StructType =>
+ doFlatten(st, separator, colNameParts)
+ case _ =>
+ col(colNameParts.mkString(".")).as(colNameParts.mkString(separator.toString)) :: Nil
+ }
+ }
+ }
+
+ // Converts string representation of a character to actual character
+ @throws[IllegalArgumentException]
+ private def toChar(str: String): Char = {
+ if (str.length == 1) {
+ str.charAt(0) match {
+ case '$' | '_' | '.' => str.charAt(0)
+ case _ => throw new IllegalArgumentException(
+ "Must use '$', '_', or '.' for separator, but got " + str)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ s"Separator cannot be more than one character: $str")
+ }
+ }
+
+ /**
+ * Flattens a nested schema into a flat one.
+ * @group misc
+ *
+ * For example:
+ * {{{
+ * scala> val df = Seq((0, (1, (3.0, "a")), (5, 0.9))).toDF()
+ * scala> df.printSchema
+ * root
+ * |-- _1: integer (nullable = false)
+ * |-- _2: struct (nullable = true)
+ * | |-- _1: integer (nullable = false)
+ * | |-- _2: struct (nullable = true)
+ * | | |-- _1: double (nullable = false)
+ * | | |-- _2: string (nullable = true)
+ * |-- _3: struct (nullable = true)
+ * | |-- _1: integer (nullable = false)
+ * | |-- _2: double (nullable = false)
+ *
+ * scala> df.flatten(separator = "$").printSchema
+ * root
+ * |-- _1: integer (nullable = false)
+ * |-- _2$_1: integer (nullable = true)
+ * |-- _2$_2$_1: double (nullable = true)
+ * |-- _2$_2$_2: string (nullable = true)
+ * |-- _3$_1: integer (nullable = true)
+ * |-- _3$_2: double (nullable = true)
+ * }}}
+ */
+ def flatten(separator: String = "$"): DataFrame =
+ df.select(doFlatten(df.schema, toChar(separator)): _*)
+
+ /**
+ * @see [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]]
+ * @group misc
+ */
+ @scala.annotation.varargs
+ def lr_datagen(exprs: Column*): Dataset[Row] = withTypedPlan {
+ planHiveGenericUDTF(
+ df,
+ "hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper",
+ "lr_datagen",
+ setMixServs(toHivemallFeatures(exprs)),
+ Seq("label", "features")
+ )
+ }
+
+ /**
+ * Returns all the columns as Seq[Column] in this [[DataFrame]].
+ */
+ private[sql] def cols: Seq[Column] = {
+ df.schema.fields.map(col => df.col(col.name)).toSeq
+ }
+
+ /**
+ * :: Experimental ::
+ * If a parameter '-mix' does not exist in a 3rd argument,
+ * set it from an environmental variable
+ * 'HIVEMALL_MIX_SERVERS'.
+ *
+ * TODO: This could work if '--deploy-mode' has 'client';
+ * otherwise, we need to set HIVEMALL_MIX_SERVERS
+ * in all possible spark workers.
+ */
+ @Experimental
+ private[this] def setMixServs(exprs: Seq[Column]): Seq[Column] = {
+ val mixes = System.getenv("HIVEMALL_MIX_SERVERS")
+ if (mixes != null && !mixes.isEmpty()) {
+ val groupId = df.sqlContext.sparkContext.applicationId + "-" + UUID.randomUUID
+ logInfo(s"set '${mixes}' as default mix servers (session: ${groupId})")
+ exprs.size match {
+ case 2 => exprs :+ Column(
+ Literal.create(s"-mix ${mixes} -mix_session ${groupId}", StringType))
+ /** TODO: Add codes in the case where exprs.size == 3. */
+ case _ => exprs
+ }
+ } else {
+ exprs
+ }
+ }
+
+ /**
+ * If the input is a [[Vector]], transform it into Hivemall features.
+ */
+ @inline private[this] def toHivemallFeatures(exprs: Seq[Column]): Seq[Column] = {
+ df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map {
+ case (StructField(_, _: VectorUDT, _, _), c) => HivemallUtils.to_hivemall_features(c)
+ case (_, c) => c
+ }
+ }
+
+ /**
+ * A convenient function to wrap a logical plan and produce a DataFrame.
+ */
+ @inline private[this] def withTypedPlan(logicalPlan: => LogicalPlan): DataFrame = {
+ val queryExecution = _sparkSession.sessionState.executePlan(logicalPlan)
+ val outputSchema = queryExecution.sparkPlan.schema
+ new Dataset[Row](df.sparkSession, queryExecution, RowEncoder(outputSchema))
+ }
+
+ @inline private[this] def withTypedPlanInCustomStrategy(logicalPlan: => LogicalPlan)
+ : DataFrame = {
+ // Inject custom strategies
+ if (!_sparkSession.experimental.extraStrategies.contains(_strategy)) {
+ _sparkSession.experimental.extraStrategies = Seq(_strategy)
+ }
+ withTypedPlan(logicalPlan)
+ }
+}
+
+object HivemallOps {
+ import internal.HivemallOpsImpl._
+
+ /**
+ * Implicitly inject the [[HivemallOps]] into [[DataFrame]].
+ */
+ implicit def dataFrameToHivemallOps(df: DataFrame): HivemallOps =
+ new HivemallOps(df)
+
+ /**
+ * @see [[hivemall.HivemallVersionUDF]]
+ * @group misc
+ */
+ def hivemall_version(): Column = withExpr {
+ planHiveUDF(
+ "hivemall.HivemallVersionUDF",
+ "hivemall_version",
+ Nil
+ )
+ }
+
+ /**
+ * @see [[hivemall.anomaly.ChangeFinderUDF]]
+ * @group anomaly
+ */
+ @scala.annotation.varargs
+ def changefinder(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.anomaly.ChangeFinderUDF",
+ "changefinder",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.anomaly.SingularSpectrumTransformUDF]]
+ * @group anomaly
+ */
+ @scala.annotation.varargs
+ def sst(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.anomaly.SingularSpectrumTransformUDF",
+ "sst",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.similarity.CosineSimilarityUDF]]
+ * @group knn.similarity
+ */
+ @scala.annotation.varargs
+ def cosine_sim(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.knn.similarity.CosineSimilarityUDF",
+ "cosine_sim",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.similarity.JaccardIndexUDF]]
+ * @group knn.similarity
+ */
+ @scala.annotation.varargs
+ def jaccard(exprs: Column*): Column = withExpr {
+ planHiveUDF(
+ "hivemall.knn.similarity.JaccardIndexUDF",
+ "jaccard",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.similarity.AngularSimilarityUDF]]
+ * @group knn.similarity
+ */
+ @scala.annotation.varargs
+ def angular_similarity(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.knn.similarity.AngularSimilarityUDF",
+ "angular_similarity",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.similarity.EuclidSimilarity]]
+ * @group knn.similarity
+ */
+ @scala.annotation.varargs
+ def euclid_similarity(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.knn.similarity.EuclidSimilarity",
+ "euclid_similarity",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.similarity.Distance2SimilarityUDF]]
+ * @group knn.similarity
+ */
+ @scala.annotation.varargs
+ def distance2similarity(exprs: Column*): Column = withExpr {
+ // TODO: Need a wrapper class because of using unsupported types
+ planHiveGenericUDF(
+ "hivemall.knn.similarity.Distance2SimilarityUDF",
+ "distance2similarity",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.distance.HammingDistanceUDF]]
+ * @group knn.distance
+ */
+ @scala.annotation.varargs
+ def hamming_distance(exprs: Column*): Column = withExpr {
+ planHiveUDF(
+ "hivemall.knn.distance.HammingDistanceUDF",
+ "hamming_distance",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.distance.PopcountUDF]]
+ * @group knn.distance
+ */
+ @scala.annotation.varargs
+ def popcnt(exprs: Column*): Column = withExpr {
+ planHiveUDF(
+ "hivemall.knn.distance.PopcountUDF",
+ "popcnt",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.distance.KLDivergenceUDF]]
+ * @group knn.distance
+ */
+ @scala.annotation.varargs
+ def kld(exprs: Column*): Column = withExpr {
+ planHiveUDF(
+ "hivemall.knn.distance.KLDivergenceUDF",
+ "kld",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.distance.EuclidDistanceUDF]]
+ * @group knn.distance
+ */
+ @scala.annotation.varargs
+ def euclid_distance(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.knn.distance.EuclidDistanceUDF",
+ "euclid_distance",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.distance.CosineDistanceUDF]]
+ * @group knn.distance
+ */
+ @scala.annotation.varargs
+ def cosine_distance(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.knn.distance.CosineDistanceUDF",
+ "cosine_distance",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.distance.AngularDistanceUDF]]
+ * @group knn.distance
+ */
+ @scala.annotation.varargs
+ def angular_distance(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.knn.distance.AngularDistanceUDF",
+ "angular_distance",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.distance.ManhattanDistanceUDF]]
+ * @group knn.distance
+ */
+ @scala.annotation.varargs
+ def manhattan_distance(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.knn.distance.ManhattanDistanceUDF",
+ "manhattan_distance",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.distance.MinkowskiDistanceUDF]]
+ * @group knn.distance
+ */
+ @scala.annotation.varargs
+ def minkowski_distance (exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.knn.distance.MinkowskiDistanceUDF",
+ "minkowski_distance",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.lsh.bBitMinHashUDF]]
+ * @group knn.lsh
+ */
+ @scala.annotation.varargs
+ def bbit_minhash(exprs: Column*): Column = withExpr {
+ planHiveUDF(
+ "hivemall.knn.lsh.bBitMinHashUDF",
+ "bbit_minhash",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.knn.lsh.MinHashesUDFWrapper]]
+ * @group knn.lsh
+ */
+ @scala.annotation.varargs
+ def minhashes(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.knn.lsh.MinHashesUDFWrapper",
+ "minhashes",
+ exprs
+ )
+ }
+
+ /**
+ * Returns new features with `1.0` (bias) appended to the input features.
+ * @see [[hivemall.ftvec.AddBiasUDFWrapper]]
+ * @group ftvec
+ */
+ def add_bias(expr: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.AddBiasUDFWrapper",
+ "add_bias",
+ expr :: Nil
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.ExtractFeatureUDFWrapper]]
+ * @group ftvec
+ *
+ * TODO: This throws java.lang.ClassCastException because
+ * HiveInspectors.toInspector has a bug in spark.
+ * Need to fix it later.
+ */
+ def extract_feature(expr: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.ExtractFeatureUDFWrapper",
+ "extract_feature",
+ expr :: Nil
+ )
+ }.as("feature")
+
+ /**
+ * @see [[hivemall.ftvec.ExtractWeightUDFWrapper]]
+ * @group ftvec
+ *
+ * TODO: This throws java.lang.ClassCastException because
+ * HiveInspectors.toInspector has a bug in spark.
+ * Need to fix it later.
+ */
+ def extract_weight(expr: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.ExtractWeightUDFWrapper",
+ "extract_weight",
+ expr :: Nil
+ )
+ }.as("value")
+
+ /**
+ * @see [[hivemall.ftvec.AddFeatureIndexUDFWrapper]]
+ * @group ftvec
+ */
+ def add_feature_index(expr: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.AddFeatureIndexUDFWrapper",
+ "add_feature_index",
+ expr :: Nil
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.SortByFeatureUDFWrapper]]
+ * @group ftvec
+ */
+ def sort_by_feature(expr: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.SortByFeatureUDFWrapper",
+ "sort_by_feature",
+ expr :: Nil
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.hashing.MurmurHash3UDF]]
+ * @group ftvec.hashing
+ */
+ def mhash(expr: Column): Column = withExpr {
+ planHiveUDF(
+ "hivemall.ftvec.hashing.MurmurHash3UDF",
+ "mhash",
+ expr :: Nil
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.hashing.Sha1UDF]]
+ * @group ftvec.hashing
+ */
+ def sha1(expr: Column): Column = withExpr {
+ planHiveUDF(
+ "hivemall.ftvec.hashing.Sha1UDF",
+ "sha1",
+ expr :: Nil
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.hashing.ArrayHashValuesUDF]]
+ * @group ftvec.hashing
+ */
+ @scala.annotation.varargs
+ def array_hash_values(exprs: Column*): Column = withExpr {
+ // TODO: Need a wrapper class because of using unsupported types
+ planHiveUDF(
+ "hivemall.ftvec.hashing.ArrayHashValuesUDF",
+ "array_hash_values",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF]]
+ * @group ftvec.hashing
+ */
+ @scala.annotation.varargs
+ def prefixed_hash_values(exprs: Column*): Column = withExpr {
+ // TODO: Need a wrapper class because of using unsupported types
+ planHiveUDF(
+ "hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF",
+ "prefixed_hash_values",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.scaling.RescaleUDF]]
+ * @group ftvec.scaling
+ */
+ def rescale(value: Column, max: Column, min: Column): Column = withExpr {
+ planHiveUDF(
+ "hivemall.ftvec.scaling.RescaleUDF",
+ "rescale",
+ value.cast(FloatType) :: max :: min :: Nil
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.scaling.ZScoreUDF]]
+ * @group ftvec.scaling
+ */
+ @scala.annotation.varargs
+ def zscore(exprs: Column*): Column = withExpr {
+ planHiveUDF(
+ "hivemall.ftvec.scaling.ZScoreUDF",
+ "zscore",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.scaling.L2NormalizationUDFWrapper]]
+ * @group ftvec.scaling
+ */
+ def normalize(expr: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.scaling.L2NormalizationUDFWrapper",
+ "normalize",
+ expr :: Nil
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.selection.ChiSquareUDF]]
+ * @group ftvec.selection
+ */
+ def chi2(observed: Column, expected: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.selection.ChiSquareUDF",
+ "chi2",
+ Seq(observed, expected)
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.conv.ToDenseFeaturesUDF]]
+ * @group ftvec.conv
+ */
+ @scala.annotation.varargs
+ def to_dense_features(exprs: Column*): Column = withExpr {
+ // TODO: Need a wrapper class because of using unsupported types
+ planHiveGenericUDF(
+ "hivemall.ftvec.conv.ToDenseFeaturesUDF",
+ "to_dense_features",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.conv.ToSparseFeaturesUDF]]
+ * @group ftvec.conv
+ */
+ @scala.annotation.varargs
+ def to_sparse_features(exprs: Column*): Column = withExpr {
+ // TODO: Need a wrapper class because of using unsupported types
+ planHiveGenericUDF(
+ "hivemall.ftvec.conv.ToSparseFeaturesUDF",
+ "to_sparse_features",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.trans.VectorizeFeaturesUDF]]
+ * @group ftvec.trans
+ */
+ @scala.annotation.varargs
+ def vectorize_features(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.trans.VectorizeFeaturesUDF",
+ "vectorize_features",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.trans.CategoricalFeaturesUDF]]
+ * @group ftvec.trans
+ */
+ @scala.annotation.varargs
+ def categorical_features(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.trans.CategoricalFeaturesUDF",
+ "categorical_features",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.trans.IndexedFeatures]]
+ * @group ftvec.trans
+ */
+ @scala.annotation.varargs
+ def indexed_features(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.trans.IndexedFeatures",
+ "indexed_features",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.ftvec.trans.QuantitativeFeaturesUDF]]
+ * @group ftvec.trans
+ */
+ @scala.annotation.varargs
+ def quantitative_features(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.ftvec.trans.QuantitativeFeaturesUDF",
+ "quantitative_features",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.smile.tools.TreePredictUDF]]
+ * @group misc
+ */
+ @scala.annotation.varargs
+ def tree_predict(exprs: Column*): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.smile.tools.TreePredictUDF",
+ "tree_predict",
+ exprs
+ )
+ }
+
+ /**
+ * @see [[hivemall.tools.array.SelectKBestUDF]]
+ * @group tools.array
+ */
+ def select_k_best(X: Column, importanceList: Column, k: Column): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.tools.array.SelectKBestUDF",
+ "select_k_best",
+ Seq(X, importanceList, k)
+ )
+ }
+
+ /**
+ * @see [[hivemall.tools.math.SigmoidGenericUDF]]
+ * @group misc
+ */
+ def sigmoid(expr: Column): Column = {
+ val one: () => Literal = () => Literal.create(1.0, DoubleType)
+ Column(one()) / (Column(one()) + exp(-expr))
+ }
+
+ /**
+ * @see [[hivemall.tools.mapred.RowIdUDFWrapper]]
+ * @group misc
+ */
+ def rowid(): Column = withExpr {
+ planHiveGenericUDF(
+ "hivemall.tools.mapred.RowIdUDFWrapper",
+ "rowid",
+ Nil
+ )
+ }.as("rowid")
+
+ /**
+ * Parses a column containing a CSV string into a [[StructType]] with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
+ * @group misc
+ *
+ * @param e a string column containing CSV data.
+ * @param schema the schema to use when parsing the csv string
+ * @param options options to control how the csv is parsed. accepts the same options and the
+ * csv data source.
+ */
+ def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+ CsvToStruct(schema, options, e.expr)
+ }
+
+ /**
+ * Parses a column containing a CSV string into a [[StructType]] with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
+ * @group misc
+ *
+ * @param e a string column containing CSV data.
+ * @param schema the schema to use when parsing the json string
+ */
+ def from_csv(e: Column, schema: StructType): Column =
+ from_csv(e, schema, Map.empty[String, String])
+
+ /**
+ * Converts a column containing a [[StructType]] into a CSV string with the specified schema.
+ * Throws an exception, in the case of an unsupported type.
+ * @group misc
+ *
+ * @param e a struct column.
+ * @param options options to control how the struct column is converted into a json string.
+ * accepts the same options and the json data source.
+ */
+ def to_csv(e: Column, options: Map[String, String]): Column = withExpr {
+ StructToCsv(options, e.expr)
+ }
+
+ /**
+ * Converts a column containing a [[StructType]] into a CSV string with the specified schema.
+ * Throws an exception, in the case of an unsupported type.
+ * @group misc
+ *
+ * @param e a struct column.
+ */
+ def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String])
+
+ /**
+ * A convenient function to wrap an expression and produce a Column.
+ */
+ @inline private def withExpr(expr: Expression): Column = Column(expr)
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
new file mode 100644
index 0000000..70cf00b
--- /dev/null
+++ b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors}
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types._
+
+object HivemallUtils {
+
+ // # of maximum dimensions for feature vectors
+ private[this] val maxDims = 100000000
+
+ /**
+ * Check whether the given schema contains a column of the required data type.
+ * @param colName column name
+ * @param dataType required column data type
+ */
+ private[this] def checkColumnType(schema: StructType, colName: String, dataType: DataType)
+ : Unit = {
+ val actualDataType = schema(colName).dataType
+ require(actualDataType.equals(dataType),
+ s"Column $colName must be of type $dataType but was actually $actualDataType.")
+ }
+
+ def to_vector_func(dense: Boolean, dims: Int): Seq[String] => Vector = {
+ if (dense) {
+ // Dense features
+ i: Seq[String] => {
+ val features = new Array[Double](dims)
+ i.map { ft =>
+ val s = ft.split(":").ensuring(_.size == 2)
+ features(s(0).toInt) = s(1).toDouble
+ }
+ Vectors.dense(features)
+ }
+ } else {
+ // Sparse features
+ i: Seq[String] => {
+ val features = i.map { ft =>
+ // val s = ft.split(":").ensuring(_.size == 2)
+ val s = ft.split(":")
+ (s(0).toInt, s(1).toDouble)
+ }
+ Vectors.sparse(dims, features)
+ }
+ }
+ }
+
+ def to_hivemall_features_func(): Vector => Array[String] = {
+ case dv: DenseVector =>
+ dv.values.zipWithIndex.map {
+ case (value, index) => s"$index:$value"
+ }
+ case sv: SparseVector =>
+ sv.values.zip(sv.indices).map {
+ case (value, index) => s"$index:$value"
+ }
+ case v =>
+ throw new IllegalArgumentException(s"Do not support vector type ${v.getClass}")
+ }
+
+ def append_bias_func(): Vector => Vector = {
+ case dv: DenseVector =>
+ val inputValues = dv.values
+ val inputLength = inputValues.length
+ val outputValues = Array.ofDim[Double](inputLength + 1)
+ System.arraycopy(inputValues, 0, outputValues, 0, inputLength)
+ outputValues(inputLength) = 1.0
+ Vectors.dense(outputValues)
+ case sv: SparseVector =>
+ val inputValues = sv.values
+ val inputIndices = sv.indices
+ val inputValuesLength = inputValues.length
+ val dim = sv.size
+ val outputValues = Array.ofDim[Double](inputValuesLength + 1)
+ val outputIndices = Array.ofDim[Int](inputValuesLength + 1)
+ System.arraycopy(inputValues, 0, outputValues, 0, inputValuesLength)
+ System.arraycopy(inputIndices, 0, outputIndices, 0, inputValuesLength)
+ outputValues(inputValuesLength) = 1.0
+ outputIndices(inputValuesLength) = dim
+ Vectors.sparse(dim + 1, outputIndices, outputValues)
+ case v =>
+ throw new IllegalArgumentException(s"Do not support vector type ${v.getClass}")
+ }
+
+ /**
+ * Transforms Hivemall features into a [[Vector]].
+ */
+ def to_vector(dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = {
+ udf(to_vector_func(dense, dims))
+ }
+
+ /**
+ * Transforms a [[Vector]] into Hivemall features.
+ */
+ def to_hivemall_features: UserDefinedFunction = udf(to_hivemall_features_func)
+
+ /**
+ * Returns a new [[Vector]] with `1.0` (bias) appended to the input [[Vector]].
+ * @group ftvec
+ */
+ def append_bias: UserDefinedFunction = udf(append_bias_func)
+
+ /**
+ * Builds a [[Vector]]-based model from a table of Hivemall models
+ */
+ def vectorized_model(df: DataFrame, dense: Boolean = false, dims: Int = maxDims)
+ : UserDefinedFunction = {
+ checkColumnType(df.schema, "feature", StringType)
+ checkColumnType(df.schema, "weight", DoubleType)
+
+ import df.sqlContext.implicits._
+ val intercept = df
+ .where($"feature" === "0")
+ .select($"weight")
+ .map { case Row(weight: Double) => weight}
+ .reduce(_ + _)
+ val weights = to_vector_func(dense, dims)(
+ df.select($"feature", $"weight")
+ .where($"feature" !== "0")
+ .map { case Row(label: String, feature: Double) => s"${label}:$feature"}
+ .collect.toSeq)
+
+ udf((input: Vector) => BLAS.dot(input, weights) + intercept)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala
new file mode 100644
index 0000000..179b146
--- /dev/null
+++ b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hive.internal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan}
+import org.apache.spark.sql.hive._
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
+
+/**
+ * This is an implementation class for [[org.apache.spark.sql.hive.HivemallOps]].
+ * This class mainly uses the internal Spark classes (e.g., `Generate` and `HiveGenericUDTF`) that
+ * have unstable interfaces (so, these interfaces may evolve in upcoming releases).
+ * Therefore, the objective of this class is to extract these unstable parts
+ * from [[org.apache.spark.sql.hive.HivemallOps]].
+ */
+private[hive] object HivemallOpsImpl extends Logging {
+
+ def planHiveUDF(
+ className: String,
+ funcName: String,
+ argumentExprs: Seq[Column]): Expression = {
+ HiveSimpleUDF(
+ name = funcName,
+ funcWrapper = new HiveFunctionWrapper(className),
+ children = argumentExprs.map(_.expr)
+ )
+ }
+
+ def planHiveGenericUDF(
+ className: String,
+ funcName: String,
+ argumentExprs: Seq[Column]): Expression = {
+ HiveGenericUDF(
+ name = funcName,
+ funcWrapper = new HiveFunctionWrapper(className),
+ children = argumentExprs.map(_.expr)
+ )
+ }
+
+ def planHiveGenericUDTF(
+ df: DataFrame,
+ className: String,
+ funcName: String,
+ argumentExprs: Seq[Column],
+ outputAttrNames: Seq[String]): LogicalPlan = {
+ Generate(
+ generator = HiveGenericUDTF(
+ name = funcName,
+ funcWrapper = new HiveFunctionWrapper(className),
+ children = argumentExprs.map(_.expr)
+ ),
+ join = false,
+ outer = false,
+ qualifier = None,
+ generatorOutput = outputAttrNames.map(UnresolvedAttribute(_)),
+ child = df.logicalPlan)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/source/XGBoostFileFormat.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/source/XGBoostFileFormat.scala b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/source/XGBoostFileFormat.scala
new file mode 100644
index 0000000..65cdf24
--- /dev/null
+++ b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/source/XGBoostFileFormat.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hive.source
+
+import java.io.File
+import java.io.IOException
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path}
+import org.apache.hadoop.io.IOUtils
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.util.ReflectionUtils
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+private[source] final class XGBoostOutputWriter(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext)
+ extends OutputWriter {
+
+ private val hadoopConf = new SerializableConfiguration(new Configuration())
+
+ override def write(row: InternalRow): Unit = {
+ val fields = row.toSeq(dataSchema)
+ val model = fields(1).asInstanceOf[Array[Byte]]
+ val filePath = new Path(new URI(s"$path"))
+ val fs = filePath.getFileSystem(hadoopConf.value)
+ val outputFile = fs.create(filePath)
+ outputFile.write(model)
+ outputFile.close()
+ }
+
+ override def close(): Unit = {}
+}
+
+object XGBoostOutputWriter {
+
+ /** Returns the compression codec extension to be used in a file name, e.g. ".gzip"). */
+ def getCompressionExtension(context: TaskAttemptContext): String = {
+ if (FileOutputFormat.getCompressOutput(context)) {
+ val codecClass = FileOutputFormat.getOutputCompressorClass(context, classOf[GzipCodec])
+ ReflectionUtils.newInstance(codecClass, context.getConfiguration).getDefaultExtension
+ } else {
+ ""
+ }
+ }
+}
+
+final class XGBoostFileFormat extends FileFormat with DataSourceRegister {
+
+ override def shortName(): String = "libxgboost"
+
+ override def toString: String = "XGBoost"
+
+ private def verifySchema(dataSchema: StructType): Unit = {
+ if (
+ dataSchema.size != 2 ||
+ !dataSchema(0).dataType.sameType(StringType) ||
+ !dataSchema(1).dataType.sameType(BinaryType)
+ ) {
+ throw new IOException(s"Illegal schema for XGBoost data, schema=$dataSchema")
+ }
+ }
+
+ override def inferSchema(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ Some(
+ StructType(
+ StructField("model_id", StringType, nullable = false) ::
+ StructField("pred_model", BinaryType, nullable = false) :: Nil)
+ )
+ }
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new XGBoostOutputWriter(path, dataSchema, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ XGBoostOutputWriter.getCompressionExtension(context) + ".xgboost"
+ }
+ }
+ }
+
+ override def buildReader(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+ verifySchema(dataSchema)
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+ (file: PartitionedFile) => {
+ val model = new Array[Byte](file.length.asInstanceOf[Int])
+ val filePath = new Path(new URI(file.filePath))
+ val fs = filePath.getFileSystem(broadcastedHadoopConf.value.value)
+
+ var in: FSDataInputStream = null
+ try {
+ in = fs.open(filePath)
+ IOUtils.readFully(in, model, 0, model.length)
+ } finally {
+ IOUtils.closeStream(in)
+ }
+
+ val converter = RowEncoder(dataSchema)
+ val fullOutput = dataSchema.map { f =>
+ AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()
+ }
+ val requiredOutput = fullOutput.filter { a =>
+ requiredSchema.fieldNames.contains(a.name)
+ }
+ val requiredColumns = GenerateUnsafeProjection.generate(requiredOutput, fullOutput)
+ (requiredColumns(
+ converter.toRow(Row(new File(file.filePath).getName, model)))
+ :: Nil
+ ).toIterator
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/resources/data/files/README.md
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/resources/data/files/README.md b/spark/spark-2.2/src/test/resources/data/files/README.md
new file mode 100644
index 0000000..0fd0299
--- /dev/null
+++ b/spark/spark-2.2/src/test/resources/data/files/README.md
@@ -0,0 +1,3 @@
+The files in this dir exist for preventing exceptions in o.a.s.sql.hive.test.TESTHive.
+We need to fix this issue in future.
+
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/resources/data/files/complex.seq
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/resources/data/files/complex.seq b/spark/spark-2.2/src/test/resources/data/files/complex.seq
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/resources/data/files/episodes.avro
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/resources/data/files/episodes.avro b/spark/spark-2.2/src/test/resources/data/files/episodes.avro
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/resources/data/files/json.txt
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/resources/data/files/json.txt b/spark/spark-2.2/src/test/resources/data/files/json.txt
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/resources/data/files/kv1.txt
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/resources/data/files/kv1.txt b/spark/spark-2.2/src/test/resources/data/files/kv1.txt
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/resources/data/files/kv3.txt
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/resources/data/files/kv3.txt b/spark/spark-2.2/src/test/resources/data/files/kv3.txt
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/resources/log4j.properties b/spark/spark-2.2/src/test/resources/log4j.properties
new file mode 100644
index 0000000..1db11f0
--- /dev/null
+++ b/spark/spark-2.2/src/test/resources/log4j.properties
@@ -0,0 +1,7 @@
+# Set everything to be logged to the console
+log4j.rootCategory=FATAL, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/scala/hivemall/mix/server/MixServerSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/scala/hivemall/mix/server/MixServerSuite.scala b/spark/spark-2.2/src/test/scala/hivemall/mix/server/MixServerSuite.scala
new file mode 100644
index 0000000..9bbd3f0
--- /dev/null
+++ b/spark/spark-2.2/src/test/scala/hivemall/mix/server/MixServerSuite.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package hivemall.mix.server
+
+import java.util.Random
+import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
+import java.util.logging.Logger
+
+import hivemall.mix.MixMessage.MixEventName
+import hivemall.mix.client.MixClient
+import hivemall.mix.server.MixServer.ServerState
+import hivemall.model.{DenseModel, PredictionModel, WeightValue}
+import hivemall.utils.io.IOUtils
+import hivemall.utils.lang.CommandLineUtils
+import hivemall.utils.net.NetUtils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+class MixServerSuite extends FunSuite with BeforeAndAfter {
+
+ private[this] var server: MixServer = _
+ private[this] var executor : ExecutorService = _
+ private[this] var port: Int = _
+
+ private[this] val rand = new Random(43)
+ private[this] val counter = Stream.from(0).iterator
+
+ private[this] val eachTestTime = 100
+ private[this] val logger =
+ Logger.getLogger(classOf[MixServerSuite].getName)
+
+ before {
+ this.port = NetUtils.getAvailablePort
+ this.server = new MixServer(
+ CommandLineUtils.parseOptions(
+ Array("-port", s"${port}", "-sync_threshold", "3"),
+ MixServer.getOptions()
+ )
+ )
+ this.executor = Executors.newSingleThreadExecutor
+ this.executor.submit(server)
+ var retry = 0
+ while (server.getState() != ServerState.RUNNING && retry < 50) {
+ Thread.sleep(1000L)
+ retry += 1
+ }
+ assert(server.getState == ServerState.RUNNING)
+ }
+
+ after { this.executor.shutdown() }
+
+ private[this] def clientDriver(
+ groupId: String, model: PredictionModel, numMsg: Int = 1000000): Unit = {
+ var client: MixClient = null
+ try {
+ client = new MixClient(MixEventName.average, groupId, s"localhost:${port}", false, 2, model)
+ model.configureMix(client, false)
+ model.configureClock()
+
+ for (_ <- 0 until numMsg) {
+ val feature = Integer.valueOf(rand.nextInt(model.size))
+ model.set(feature, new WeightValue(1.0f))
+ }
+
+ while (true) { Thread.sleep(eachTestTime * 1000 + 100L) }
+ assert(model.getNumMixed > 0)
+ } finally {
+ IOUtils.closeQuietly(client)
+ }
+ }
+
+ private[this] def fixedGroup: (String, () => String) =
+ ("fixed", () => "fixed")
+ private[this] def uniqueGroup: (String, () => String) =
+ ("unique", () => s"${counter.next}")
+
+ Seq(65536).map { ndims =>
+ Seq(4).map { nclient =>
+ Seq(fixedGroup, uniqueGroup).map { id =>
+ val testName = s"dense-dim:${ndims}-clinet:${nclient}-${id._1}"
+ ignore(testName) {
+ val clients = Executors.newCachedThreadPool()
+ val numClients = nclient
+ val models = (0 until numClients).map(i => new DenseModel(ndims, false))
+ (0 until numClients).map { i =>
+ clients.submit(new Runnable() {
+ override def run(): Unit = {
+ try {
+ clientDriver(
+ s"${testName}-${id._2}",
+ models(i)
+ )
+ } catch {
+ case e: InterruptedException =>
+ assert(false, e.getMessage)
+ }
+ }
+ })
+ }
+ clients.awaitTermination(eachTestTime, TimeUnit.SECONDS)
+ clients.shutdown()
+ val nMixes = models.map(d => d.getNumMixed).reduce(_ + _)
+ logger.info(s"${testName} --> ${(nMixes + 0.0) / eachTestTime} mixes/s")
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala b/spark/spark-2.2/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala
new file mode 100644
index 0000000..c127276
--- /dev/null
+++ b/spark/spark-2.2/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package hivemall.tools
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.hive.test.TestHive
+
+class RegressionDatagenSuite extends FunSuite {
+
+ test("datagen") {
+ val df = RegressionDatagen.exec(
+ TestHive, min_examples = 10000, n_features = 100, n_dims = 65536, dense = false, cl = true)
+ assert(df.count() >= 10000)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/scala/org/apache/spark/SparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/scala/org/apache/spark/SparkFunSuite.scala b/spark/spark-2.2/src/test/scala/org/apache/spark/SparkFunSuite.scala
new file mode 100644
index 0000000..ed1bb6a
--- /dev/null
+++ b/spark/spark-2.2/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark
+
+// scalastyle:off
+import org.scalatest.{FunSuite, Outcome}
+
+import org.apache.spark.internal.Logging
+
+/**
+ * Base abstract class for all unit tests in Spark for handling common functionality.
+ */
+private[spark] abstract class SparkFunSuite extends FunSuite with Logging {
+// scalastyle:on
+
+ /**
+ * Log the suite name and the test name before and after each test.
+ *
+ * Subclasses should never override this method. If they wish to run
+ * custom code before and after each test, they should mix in the
+ * {{org.scalatest.BeforeAndAfter}} trait instead.
+ */
+ final protected override def withFixture(test: NoArgTest): Outcome = {
+ val testName = test.text
+ val suiteName = this.getClass.getName
+ val shortSuiteName = suiteName.replaceAll("org.apache.spark", "o.a.s")
+ try {
+ logInfo(s"\n\n===== TEST OUTPUT FOR $shortSuiteName: '$testName' =====\n")
+ test()
+ } finally {
+ logInfo(s"\n\n===== FINISHED $shortSuiteName: '$testName' =====\n")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/8bf6dd9e/spark/spark-2.2/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-2.2/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala b/spark/spark-2.2/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala
new file mode 100644
index 0000000..903dc0a
--- /dev/null
+++ b/spark/spark-2.2/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.spark.SparkFunSuite
+
+class HivemallLabeledPointSuite extends SparkFunSuite {
+
+ test("toString") {
+ val lp = HivemallLabeledPoint(1.0f, Seq("1:0.5", "3:0.3", "8:0.1"))
+ assert(lp.toString === "1.0,[1:0.5,3:0.3,8:0.1]")
+ }
+
+ test("parse") {
+ val lp = HivemallLabeledPoint.parse("1.0,[1:0.5,3:0.3,8:0.1]")
+ assert(lp.label === 1.0)
+ assert(lp.features === Seq("1:0.5", "3:0.3", "8:0.1"))
+ }
+}