You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by my...@apache.org on 2017/04/09 21:43:39 UTC
[1/3] incubator-hivemall git commit: Close #67: [HIVEMALL-55][SPARK]
Drop the Spark-v1.6 support
Repository: incubator-hivemall
Updated Branches:
refs/heads/master 8dc3a024d -> c53b9ff9b
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
deleted file mode 100644
index df82547..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
+++ /dev/null
@@ -1,665 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.{Column, Row}
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
-import org.apache.spark.sql.types._
-import org.apache.spark.test.HivemallQueryTest
-import org.apache.spark.test.TestDoubleWrapper._
-import org.apache.spark.test.TestUtils._
-
-final class HivemallOpsSuite extends HivemallQueryTest {
-
- test("knn.similarity") {
- val row1 = DummyInputData.select(cosine_sim(Seq(1, 2, 3, 4), Seq(3, 4, 5, 6))).collect
- assert(row1(0).getFloat(0) ~== 0.500f)
-
- val row2 = DummyInputData.select(jaccard(5, 6)).collect
- assert(row2(0).getFloat(0) ~== 0.96875f)
-
- val row3 = DummyInputData.select(angular_similarity(Seq(1, 2, 3), Seq(4, 5, 6))).collect
- assert(row3(0).getFloat(0) ~== 0.500f)
-
- val row4 = DummyInputData.select(euclid_similarity(Seq(5, 3, 1), Seq(2, 8, 3))).collect
- assert(row4(0).getFloat(0) ~== 0.33333334f)
-
- // TODO: $"c0" throws AnalysisException, why?
- // val row5 = DummyInputData.select(distance2similarity(DummyInputData("c0"))).collect
- // assert(row5(0).getFloat(0) ~== 0.1f)
- }
-
- test("knn.distance") {
- val row1 = DummyInputData.select(hamming_distance(1, 3)).collect
- assert(row1(0).getInt(0) == 1)
-
- val row2 = DummyInputData.select(popcnt(1)).collect
- assert(row2(0).getInt(0) == 1)
-
- val row3 = DummyInputData.select(kld(0.1, 0.5, 0.2, 0.5)).collect
- assert(row3(0).getDouble(0) ~== 0.01)
-
- val row4 = DummyInputData.select(
- euclid_distance(Seq("0.1", "0.5"), Seq("0.2", "0.5"))).collect
- assert(row4(0).getFloat(0) ~== 1.4142135f)
-
- val row5 = DummyInputData.select(
- cosine_distance(Seq("0.8", "0.3"), Seq("0.4", "0.6"))).collect
- assert(row5(0).getFloat(0) ~== 1.0f)
-
- val row6 = DummyInputData.select(
- angular_distance(Seq("0.1", "0.1"), Seq("0.3", "0.8"))).collect
- assert(row6(0).getFloat(0) ~== 0.50f)
-
- val row7 = DummyInputData.select(
- manhattan_distance(Seq("0.7", "0.8"), Seq("0.5", "0.6"))).collect
- assert(row7(0).getFloat(0) ~== 4.0f)
-
- val row8 = DummyInputData.select(
- minkowski_distance(Seq("0.1", "0.2"), Seq("0.2", "0.2"), 1.0)).collect
- assert(row8(0).getFloat(0) ~== 2.0f)
- }
-
- test("knn.lsh") {
- import hiveContext.implicits._
- assert(IntList2Data.minhash(1, $"target").count() > 0)
-
- assert(DummyInputData.select(bbit_minhash(Seq("1:0.1", "2:0.5"), false)).count
- == DummyInputData.count)
- assert(DummyInputData.select(minhashes(Seq("1:0.1", "2:0.5"), false)).count
- == DummyInputData.count)
- }
-
- test("ftvec - add_bias") {
- // TODO: This import does not work and why?
- // import hiveContext.implicits._
- assert(TinyTrainData.select(add_bias(TinyTrainData.col("features"))).collect.toSet
- === Set(
- Row(Seq("1:0.8", "2:0.2", "0:1.0")),
- Row(Seq("2:0.7", "0:1.0")),
- Row(Seq("1:0.9", "0:1.0"))))
- }
-
- test("ftvec - extract_feature") {
- val row = DummyInputData.select(extract_feature("1:0.8")).collect
- assert(row(0).getString(0) == "1")
- }
-
- test("ftvec - extract_weight") {
- val row = DummyInputData.select(extract_weight("3:0.1")).collect
- assert(row(0).getDouble(0) ~== 0.1)
- }
-
- test("ftvec - explode_array") {
- import hiveContext.implicits._
- assert(TinyTrainData.explode_array("features")
- .select($"feature").collect.toSet
- === Set(Row("1:0.8"), Row("2:0.2"), Row("2:0.7"), Row("1:0.9")))
- }
-
- test("ftvec - add_feature_index") {
- // import hiveContext.implicits._
- val doubleListData = {
- // TODO: Use `toDF`
- val rowRdd = hiveContext.sparkContext.parallelize(
- Row(0.8 :: 0.5 :: Nil) ::
- Row(0.3 :: 0.1 :: Nil) ::
- Row(0.2 :: Nil) ::
- Nil
- )
- hiveContext.createDataFrame(
- rowRdd,
- StructType(
- StructField("data", ArrayType(DoubleType), true) ::
- Nil)
- )
- }
-
- assert(doubleListData.select(
- add_feature_index(doubleListData.col("data"))).collect.toSet
- === Set(
- Row(Seq("1:0.8", "2:0.5")),
- Row(Seq("1:0.3", "2:0.1")),
- Row(Seq("1:0.2"))))
- }
-
- test("ftvec - sort_by_feature") {
- // import hiveContext.implicits._
- val intFloatMapData = {
- // TODO: Use `toDF`
- val rowRdd = hiveContext.sparkContext.parallelize(
- Row(Map(1 -> 0.3f, 2 -> 0.1f, 3 -> 0.5f)) ::
- Row(Map(2 -> 0.4f, 1 -> 0.2f)) ::
- Row(Map(2 -> 0.4f, 3 -> 0.2f, 1 -> 0.1f, 4 -> 0.6f)) ::
- Nil
- )
- hiveContext.createDataFrame(
- rowRdd,
- StructType(
- StructField("data", MapType(IntegerType, FloatType), true) ::
- Nil)
- )
- }
-
- val sortedKeys = intFloatMapData.select(sort_by_feature(intFloatMapData.col("data")))
- .collect.map {
- case Row(m: Map[Int, Float]) => m.keysIterator.toSeq
- }
-
- assert(sortedKeys.toSet === Set(Seq(1, 2, 3), Seq(1, 2), Seq(1, 2, 3, 4)))
- }
-
- test("ftvec.hash") {
- assert(DummyInputData.select(mhash("test")).count == DummyInputData.count)
- assert(DummyInputData.select(sha1("test")).count == DummyInputData.count)
- // assert(DummyInputData.select(array_hash_values(Seq("aaa", "bbb"))).count
- // == DummyInputData.count)
- // assert(DummyInputData.select(prefixed_hash_values(Seq("ccc", "ddd"), "prefix")).count
- // == DummyInputData.count)
- }
-
- test("ftvec.scaling") {
- assert(TinyTrainData.select(rescale(2.0f, 1.0, 5.0)).collect.toSet
- === Set(Row(0.25f)))
- assert(TinyTrainData.select(zscore(1.0f, 0.5, 0.5)).collect.toSet
- === Set(Row(1.0f)))
- assert(TinyTrainData.select(normalize(TinyTrainData.col("features"))).collect.toSet
- === Set(
- Row(Seq("1:0.9701425", "2:0.24253562")),
- Row(Seq("2:1.0")),
- Row(Seq("1:1.0"))))
- }
-
- test("ftvec.selection - chi2") {
- import hiveContext.implicits._
-
- // see also hivemall.ftvec.selection.ChiSquareUDFTest
- val df = Seq(
- Seq(
- Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996),
- Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3),
- Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998)
- ) -> Seq(
- Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589),
- Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589),
- Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589)))
- .toDF("arg0", "arg1")
-
- val result = df.select(chi2(df("arg0"), df("arg1"))).collect
- assert(result.length == 1)
- val chi2Val = result.head.getAs[Row](0).getAs[Seq[Double]](0)
- val pVal = result.head.getAs[Row](0).getAs[Seq[Double]](1)
-
- (chi2Val, Seq(10.81782088, 3.59449902, 116.16984746, 67.24482759))
- .zipped
- .foreach((actual, expected) => assert(actual ~== expected))
-
- (pVal, Seq(4.47651499e-03, 1.65754167e-01, 5.94344354e-26, 2.50017968e-15))
- .zipped
- .foreach((actual, expected) => assert(actual ~== expected))
- }
-
- test("ftvec.conv - quantify") {
- import hiveContext.implicits._
- val testDf = Seq((1, "aaa", true), (2, "bbb", false), (3, "aaa", false)).toDF
- // This test is done in a single parition because `HivemallOps#quantify` assigns indentifiers
- // for non-numerical values in each partition.
- assert(testDf.coalesce(1).quantify(Seq[Column](true) ++ testDf.cols: _*).collect.toSet
- === Set(Row(1, 0, 0), Row(2, 1, 1), Row(3, 0, 1)))
- }
-
- test("ftvec.amplify") {
- import hiveContext.implicits._
- assert(TinyTrainData.amplify(3, $"label", $"features").count() == 9)
- assert(TinyTrainData.rand_amplify(3, "-buf 128", $"label", $"features").count() == 9)
- assert(TinyTrainData.part_amplify(3).count() == 9)
- }
-
- ignore("ftvec.conv") {
- import hiveContext.implicits._
-
- val df1 = Seq((0.0, "1:0.1" :: "3:0.3" :: Nil), (1, 0, "2:0.2" :: Nil)).toDF("a", "b")
- assert(df1.select(to_dense_features(df1("b"), 3)).collect.toSet
- === Set(Row(Array(0.1f, 0.0f, 0.3f)), Array(0.0f, 0.2f, 0.0f)))
-
- val df2 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c")
- assert(df2.select(to_sparse_features(df2("a"), df2("b"), df2("c"))).collect.toSet
- === Set(Row(Seq("1:0.1", "2:0.2", "3:0.3")), Row(Seq("1:0.2", "2:0.5", "3:0.4"))))
- }
-
- test("ftvec.trans") {
- import hiveContext.implicits._
-
- val df1 = Seq((1, -3, 1), (2, -2, 1)).toDF("a", "b", "c")
- assert(df1.binarize_label($"a", $"b", $"c").collect.toSet === Set(Row(1, 1)))
-
- val df2 = Seq((0.1f, 0.2f), (0.5f, 0.3f)).toDF("a", "b")
- assert(df2.select(vectorize_features(Seq("a", "b"), df2("a"), df2("b"))).collect.toSet
- === Set(Row(Seq("a:0.1", "b:0.2")), Row(Seq("a:0.5", "b:0.3"))))
-
- val df3 = Seq(("c11", "c12"), ("c21", "c22")).toDF("a", "b")
- assert(df3.select(categorical_features(Seq("a", "b"), df3("a"), df3("b"))).collect.toSet
- === Set(Row(Seq("a#c11", "b#c12")), Row(Seq("a#c21", "b#c22"))))
-
- val df4 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c")
- assert(df4.select(indexed_features(df4("a"), df4("b"), df4("c"))).collect.toSet
- === Set(Row(Seq("1:0.1", "2:0.2", "3:0.3")), Row(Seq("1:0.2", "2:0.5", "3:0.4"))))
-
- val df5 = Seq(("xxx", "yyy", 0), ("zzz", "yyy", 1)).toDF("a", "b", "c")
- assert(df5.coalesce(1).quantified_features(true, df5("a"), df5("b"), df5("c")).collect.toSet
- === Set(Row(Seq(0.0, 0.0, 0.0)), Row(Seq(1.0, 0.0, 1.0))))
-
- val df6 = Seq((0.1, 0.2), (0.5, 0.3)).toDF("a", "b")
- assert(df6.select(quantitative_features(Seq("a", "b"), df6("a"), df6("b"))).collect.toSet
- === Set(Row(Seq("a:0.1", "b:0.2")), Row(Seq("a:0.5", "b:0.3"))))
- }
-
- test("misc - hivemall_version") {
- assert(DummyInputData.select(hivemall_version()).collect.toSet === Set(Row("0.4.2-rc.2")))
- /**
- * TODO: Why a test below does fail?
- *
- * checkAnswer(
- * DummyInputData.select(hivemall_version()).distinct,
- * Row("0.3.1")
- * )
- *
- * The test throw an exception below:
- *
- * - hivemall_version *** FAILED ***
- * org.apache.spark.sql.AnalysisException:
- * Cannot resolve column name "HiveSimpleUDF#hivemall.HivemallVersionUDF()" among
- * (HiveSimpleUDF#hivemall.Hivemall VersionUDF());
- * at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
- * at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
- * at scala.Option.getOrElse(Option.scala:120)
- * at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158)
- * at org.apache.spark.sql.DataFrame$$anonfun$30.apply(DataFrame.scala:1227)
- * at org.apache.spark.sql.DataFrame$$anonfun$30.apply(DataFrame.scala:1227)
- * at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
- * ...
- */
- }
-
- test("misc - rowid") {
- val df = DummyInputData.select(rowid())
- assert(df.distinct.count == df.count)
- }
-
- test("misc - each_top_k") {
- import hiveContext.implicits._
- val testDf = Seq(
- ("a", "1", 0.5), ("b", "5", 0.1), ("a", "3", 0.8), ("c", "6", 0.3), ("b", "4", 0.3),
- ("a", "2", 0.6)
- ).toDF("group", "attr", "value")
-
- // Compute top-1 rows for each group
- assert(
- testDf.each_top_k(1, $"group", $"value", testDf("attr")).collect.toSet ===
- Set(
- Row(1, 0.8, "3"),
- Row(1, 0.3, "4"),
- Row(1, 0.3, "6")
- ))
-
- // Compute reverse top-1 rows for each group
- assert(
- testDf.each_top_k(-1, $"group", $"value", testDf("attr")).collect.toSet ===
- Set(
- Row(1, 0.5, "1"),
- Row(1, 0.1, "5"),
- Row(1, 0.3, "6")
- ))
- }
-
- /**
- * This test fails because;
- *
- * Cause: java.lang.OutOfMemoryError: Java heap space
- * at hivemall.smile.tools.RandomForestEnsembleUDAF$Result.<init>
- * (RandomForestEnsembleUDAF.java:128)
- * at hivemall.smile.tools.RandomForestEnsembleUDAF$RandomForestPredictUDAFEvaluator
- * .terminate(RandomForestEnsembleUDAF.java:91)
- * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- */
- ignore("misc - tree_predict") {
- import hiveContext.implicits._
-
- val model = Seq((0.0, 0.1 :: 0.1 :: Nil), (1.0, 0.2 :: 0.3 :: 0.2 :: Nil))
- .toDF("label", "features")
- .train_randomforest_regr($"features", $"label", "-trees 2")
-
- val testData = Seq((0.0, 0.1 :: 0.0 :: Nil), (1.0, 0.3 :: 0.5 :: 0.4 :: Nil))
- .toDF("label", "features")
- .select(rowid(), $"label", $"features")
-
- val predicted = model
- .join(testData).coalesce(1)
- .select(
- $"rowid",
- tree_predict(
- model("model_id"), model("model_type"), model("pred_model"), testData("features"), true)
- .as("predicted")
- )
- .groupby($"rowid")
- .rf_ensemble("predicted").as("rowid", "predicted")
- .select($"predicted.label")
-
- checkAnswer(predicted, Seq(Row(0), Row(1)))
- }
-
- test("tools.array - select_k_best") {
- import hiveContext.implicits._
- import org.apache.spark.sql.functions._
-
- val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9))
- val df = data.map(d => (d, Seq(3, 1, 2))).toDF("features", "importance_list")
- val k = 2
-
- // if use checkAnswer here, fail for some reason, maybe type? but it's okay on spark-2.0
- assert(df.select(select_k_best(df("features"), df("importance_list"), lit(k))).collect ===
- data.map(s => Row(Seq(s(0).toDouble, s(2).toDouble))))
- }
-
- test("misc - sigmoid") {
- import hiveContext.implicits._
- /**
- * TODO: SigmodUDF only accepts floating-point types in spark-v1.5.0?
- * This test throws an exception below:
- *
- * org.apache.spark.sql.catalyst.analysis.UnresolvedException:
- * Invalid call to dataType on unresolved object, tree: 'data
- * at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType
- * (unresolved.scala:59)
- * at org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$method$1.apply(hiveUDFs.scala:119)
- * ...
- */
- val rows = DummyInputData.select(sigmoid($"c0")).collect
- assert(rows(0).getDouble(0) ~== 0.500)
- assert(rows(1).getDouble(0) ~== 0.731)
- assert(rows(2).getDouble(0) ~== 0.880)
- assert(rows(3).getDouble(0) ~== 0.952)
- }
-
- test("misc - lr_datagen") {
- assert(TinyTrainData.lr_datagen("-n_examples 100 -n_features 10 -seed 100").count >= 100)
- }
-
- test("invoke regression functions") {
- import hiveContext.implicits._
- Seq(
- "train_adadelta",
- "train_adagrad",
- "train_arow_regr",
- "train_arowe_regr",
- "train_arowe2_regr",
- "train_logregr",
- "train_pa1_regr",
- "train_pa1a_regr",
- "train_pa2_regr",
- "train_pa2a_regr"
- ).map { func =>
- invokeFunc(new HivemallOps(TinyTrainData), func, Seq($"features", $"label"))
- .foreach(_ => {}) // Just call it
- }
- }
-
- test("invoke classifier functions") {
- import hiveContext.implicits._
- Seq(
- "train_perceptron",
- "train_pa",
- "train_pa1",
- "train_pa2",
- "train_cw",
- "train_arow",
- "train_arowh",
- "train_scw",
- "train_scw2",
- "train_adagrad_rda"
- ).map { func =>
- invokeFunc(new HivemallOps(TinyTrainData), func, Seq($"features", $"label"))
- .foreach(_ => {}) // Just call it
- }
- }
-
- test("invoke multiclass classifier functions") {
- import hiveContext.implicits._
- Seq(
- "train_multiclass_perceptron",
- "train_multiclass_pa",
- "train_multiclass_pa1",
- "train_multiclass_pa2",
- "train_multiclass_cw",
- "train_multiclass_arow",
- "train_multiclass_scw",
- "train_multiclass_scw2"
- ).map { func =>
- // TODO: Why is a label type [Int|Text] only in multiclass classifiers?
- invokeFunc(new HivemallOps(TinyTrainData), func, Seq($"features", $"label".cast(IntegerType)))
- .foreach(_ => {}) // Just call it
- }
- }
-
- test("invoke random forest functions") {
- import hiveContext.implicits._
- val testDf = Seq(
- (Array(0.3, 0.1, 0.2), 1),
- (Array(0.3, 0.1, 0.2), 0),
- (Array(0.3, 0.1, 0.2), 0)).toDF.as("features", "label")
- Seq(
- "train_randomforest_regr",
- "train_randomforest_classifier"
- ).map { func =>
- invokeFunc(new HivemallOps(testDf.coalesce(1)), func, Seq($"features", $"label"))
- .foreach(_ => {}) // Just call it
- }
- }
-
- ignore("check regression precision") {
- Seq(
- "train_adadelta",
- "train_adagrad",
- "train_arow_regr",
- "train_arowe_regr",
- "train_arowe2_regr",
- "train_logregr",
- "train_pa1_regr",
- "train_pa1a_regr",
- "train_pa2_regr",
- "train_pa2a_regr"
- ).map { func =>
- checkRegrPrecision(func)
- }
- }
-
- ignore("check classifier precision") {
- Seq(
- "train_perceptron",
- "train_pa",
- "train_pa1",
- "train_pa2",
- "train_cw",
- "train_arow",
- "train_arowh",
- "train_scw",
- "train_scw2",
- "train_adagrad_rda"
- ).map { func =>
- checkClassifierPrecision(func)
- }
- }
-
- test("user-defined aggregators for ensembles") {
- import hiveContext.implicits._
-
- val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF.as("c0", "c1")
- val row1 = df1.groupby($"c0").voted_avg("c1").collect
- assert(row1(0).getDouble(1) ~== 0.15)
- assert(row1(1).getDouble(1) ~== 0.10)
-
- val df2 = Seq((1, 0.6f), (2, 0.2f), (1, 0.2f)).toDF.as("c0", "c1")
- val row2 = df2.groupby($"c0").agg("c1" -> "voted_avg").collect
- assert(row2(0).getDouble(1) ~== 0.40)
- assert(row2(1).getDouble(1) ~== 0.20)
-
- val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF.as("c0", "c1")
- val row3 = df3.groupby($"c0").weight_voted_avg("c1").collect
- assert(row3(0).getDouble(1) ~== 0.50)
- assert(row3(1).getDouble(1) ~== 0.30)
-
- val df4 = Seq((1, 0.1f), (2, 0.9f), (1, 0.1f)).toDF.as("c0", "c1")
- val row4 = df4.groupby($"c0").agg("c1" -> "weight_voted_avg").collect
- assert(row4(0).getDouble(1) ~== 0.10)
- assert(row4(1).getDouble(1) ~== 0.90)
-
- val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF.as("c0", "c1", "c2")
- val row5 = df5.groupby($"c0").argmin_kld("c1", "c2").collect
- assert(row5(0).getFloat(1) ~== 0.266666666)
- assert(row5(1).getFloat(1) ~== 0.80)
-
- val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF.as("c0", "c1", "c2")
- val row6 = df6.groupby($"c0").max_label("c2", "c1").collect
- assert(row6(0).getString(1) == "id-1")
-
- val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF.as("c0", "c1", "c2")
- val row7 = df7.groupby($"c0").maxrow("c2", "c1").as("c0", "c1").select($"c1.col1").collect
- assert(row7(0).getString(0) == "id-0")
-
- // val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF.as("c0", "c1")
- // val row8 = df8.groupby($"c0").rf_ensemble("c1").as("c0", "c1")
- // .select("c1.probability").collect
- // assert(row8(0).getDouble(0) ~== 0.3333333333)
- // assert(row8(1).getDouble(0) ~== 1.0)
-
- // val df9 = Seq((1, 3), (1, 8), (2, 9), (1, 1)).toDF.as("c0", "c1")
- // val row9 = df9.groupby($"c0").agg("c1" -> "rf_ensemble").as("c0", "c1")
- // .select("c1.probability").collect
- // assert(row9(0).getDouble(0) ~== 0.3333333333)
- // assert(row9(1).getDouble(0) ~== 1.0)
- }
-
- test("user-defined aggregators for evaluation") {
- import hiveContext.implicits._
-
- val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF.as("c0", "c1", "c2")
- val row1 = df1.groupby($"c0").mae("c1", "c2").collect
- assert(row1(0).getDouble(1) ~== 0.26666666)
-
- val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2")
- val row2 = df2.groupby($"c0").mse("c1", "c2").collect
- assert(row2(0).getDouble(1) ~== 0.29999999)
-
- val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2")
- val row3 = df3.groupby($"c0").rmse("c1", "c2").collect
- assert(row3(0).getDouble(1) ~== 0.54772253)
-
- val df4 = Seq((1, Array(1, 2), Array(2, 3)), (1, Array(3, 8), Array(5, 4))).toDF
- .as("c0", "c1", "c2")
- val row4 = df4.groupby($"c0").f1score("c1", "c2").collect
- assert(row4(0).getDouble(1) ~== 0.25)
- }
-
- ignore("user-defined aggregators for ftvec.trans") {
- import hiveContext.implicits._
-
- val df0 = Seq((1, "cat", "mammal", 9), (1, "dog", "mammal", 10), (1, "human", "mammal", 10),
- (1, "seahawk", "bird", 101), (1, "wasp", "insect", 3), (1, "wasp", "insect", 9),
- (1, "cat", "mammal", 101), (1, "dog", "mammal", 1), (1, "human", "mammal", 9))
- .toDF("col0", "cat1", "cat2", "cat3")
- val row00 = df0.groupby($"col0").onehot_encoding("cat1")
- val row01 = df0.groupby($"col0").onehot_encoding("cat1", "cat2", "cat3")
-
- val result000 = row00.collect()(0).getAs[Row](1).getAs[Map[String, Int]](0)
- val result01 = row01.collect()(0).getAs[Row](1)
- val result010 = result01.getAs[Map[String, Int]](0)
- val result011 = result01.getAs[Map[String, Int]](1)
- val result012 = result01.getAs[Map[String, Int]](2)
-
- assert(result000.keySet === Set("seahawk", "cat", "human", "wasp", "dog"))
- assert(result000.values.toSet === Set(1, 2, 3, 4, 5))
- assert(result010.keySet === Set("seahawk", "cat", "human", "wasp", "dog"))
- assert(result010.values.toSet === Set(1, 2, 3, 4, 5))
- assert(result011.keySet === Set("bird", "insect", "mammal"))
- assert(result011.values.toSet === Set(6, 7, 8))
- assert(result012.keySet === Set(1, 3, 9, 10, 101))
- assert(result012.values.toSet === Set(9, 10, 11, 12, 13))
- }
-
- test("user-defined aggregators for ftvec.selection") {
- import hiveContext.implicits._
-
- // see also hivemall.ftvec.selection.SignalNoiseRatioUDAFTest
- // binary class
- // +-----------------+-------+
- // | features | class |
- // +-----------------+-------+
- // | 5.1,3.5,1.4,0.2 | 0 |
- // | 4.9,3.0,1.4,0.2 | 0 |
- // | 4.7,3.2,1.3,0.2 | 0 |
- // | 7.0,3.2,4.7,1.4 | 1 |
- // | 6.4,3.2,4.5,1.5 | 1 |
- // | 6.9,3.1,4.9,1.5 | 1 |
- // +-----------------+-------+
- val df0 = Seq(
- (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0)),
- (1, Seq(4.7, 3.2, 1.3, 0.2), Seq(1, 0)), (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1)),
- (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1)), (1, Seq(6.9, 3.1, 4.9, 1.5), Seq(0, 1)))
- .toDF("c0", "arg0", "arg1")
- val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect
- (row0(0).getAs[Seq[Double]](1), Seq(4.38425236, 0.26390002, 15.83984511, 26.87005769))
- .zipped
- .foreach((actual, expected) => assert(actual ~== expected))
-
- // multiple class
- // +-----------------+-------+
- // | features | class |
- // +-----------------+-------+
- // | 5.1,3.5,1.4,0.2 | 0 |
- // | 4.9,3.0,1.4,0.2 | 0 |
- // | 7.0,3.2,4.7,1.4 | 1 |
- // | 6.4,3.2,4.5,1.5 | 1 |
- // | 6.3,3.3,6.0,2.5 | 2 |
- // | 5.8,2.7,5.1,1.9 | 2 |
- // +-----------------+-------+
- val df1 = Seq(
- (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0, 0)),
- (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1, 0)), (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1, 0)),
- (1, Seq(6.3, 3.3, 6.0, 2.5), Seq(0, 0, 1)), (1, Seq(5.8, 2.7, 5.1, 1.9), Seq(0, 0, 1)))
- .toDF("c0", "arg0", "arg1")
- val row1 = df1.groupby($"c0").snr("arg0", "arg1").collect
- (row1(0).getAs[Seq[Double]](1), Seq(8.43181818, 1.32121212, 42.94949495, 33.80952381))
- .zipped
- .foreach((actual, expected) => assert(actual ~== expected))
- }
-
- test("user-defined aggregators for tools.matrix") {
- import hiveContext.implicits._
-
- // | 1 2 3 |T | 5 6 7 |
- // | 3 4 5 | * | 7 8 9 |
- val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 9)))
- .toDF("c0", "arg0", "arg1")
-
- // if use checkAnswer here, fail for some reason, maybe type? but it's okay on spark-2.0
- assert(df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect() ===
- Seq(Row(1, Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0)))))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
deleted file mode 100644
index 276bc13..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import java.io.{BufferedInputStream, BufferedReader, InputStream, InputStreamReader}
-import java.net.URL
-import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService}
-
-import hivemall.mix.server.MixServer
-import hivemall.mix.server.MixServer.ServerState
-import hivemall.utils.lang.CommandLineUtils
-import hivemall.utils.net.NetUtils
-import org.apache.commons.cli.Options
-import org.apache.commons.compress.compressors.CompressorStreamFactory
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.feature.HivemallLabeledPoint
-import org.apache.spark.sql.{Column, DataFrame, Row}
-import org.apache.spark.sql.functions.when
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
-import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.test.TestUtils.{benchmark, invokeFunc}
-
-final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
-
- // Load A9a training and test data
- val a9aLineParser = (line: String) => {
- val elements = line.split(" ")
- val (label, features) = (elements.head, elements.tail)
- HivemallLabeledPoint(if (label == "+1") 1.0f else 0.0f, features)
- }
-
- lazy val trainA9aData: DataFrame =
- getDataFromURI(
- new URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a").openStream,
- a9aLineParser)
-
- lazy val testA9aData: DataFrame =
- getDataFromURI(
- new URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a.t").openStream,
- a9aLineParser)
-
- // Load A9a training and test data
- val kdd2010aLineParser = (line: String) => {
- val elements = line.split(" ")
- val (label, features) = (elements.head, elements.tail)
- HivemallLabeledPoint(if (label == "1") 1.0f else 0.0f, features)
- }
-
- lazy val trainKdd2010aData: DataFrame =
- getDataFromURI(
- new CompressorStreamFactory().createCompressorInputStream(
- new BufferedInputStream(
- new URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.bz2")
- .openStream
- )
- ),
- kdd2010aLineParser,
- 8)
-
- lazy val testKdd2010aData: DataFrame =
- getDataFromURI(
- new CompressorStreamFactory().createCompressorInputStream(
- new BufferedInputStream(
- new URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2")
- .openStream
- )
- ),
- kdd2010aLineParser,
- 8)
-
- // Placeholder for a mix server
- var mixServExec: ExecutorService = _
- var assignedPort: Int = _
-
- private def getDataFromURI(
- in: InputStream, lineParseFunc: String => HivemallLabeledPoint, numPart: Int = 2)
- : DataFrame = {
- val reader = new BufferedReader(new InputStreamReader(in))
- try {
- // Cache all data because stream closed soon
- val lines = FileIterator(reader.readLine()).toSeq
- val rdd = TestHive.sparkContext.parallelize(lines, numPart).map(lineParseFunc)
- val df = rdd.toDF.cache
- df.foreach(_ => {})
- df
- } finally {
- reader.close()
- }
- }
-
- before {
- assert(mixServExec == null)
-
- // Launch a MIX server as thread
- assignedPort = NetUtils.getAvailablePort
- val method = classOf[MixServer].getDeclaredMethod("getOptions")
- method.setAccessible(true)
- val options = method.invoke(null).asInstanceOf[Options]
- val cl = CommandLineUtils.parseOptions(
- Array(
- "-port", Integer.toString(assignedPort),
- "-sync_threshold", "1"
- ),
- options
- )
- val server = new MixServer(cl)
- mixServExec = Executors.newSingleThreadExecutor()
- mixServExec.submit(server)
- var retry = 0
- while (server.getState() != ServerState.RUNNING && retry < 32) {
- Thread.sleep(100L)
- retry += 1
- }
- assert(ServerState.RUNNING == server.getState)
- }
-
- after {
- mixServExec.shutdownNow()
- mixServExec = null
- }
-
- benchmark("model mixing test w/ regression") {
- Seq(
- "train_adadelta",
- "train_adagrad",
- "train_arow_regr",
- "train_arowe_regr",
- "train_arowe2_regr",
- "train_logregr",
- "train_pa1_regr",
- "train_pa1a_regr",
- "train_pa2_regr",
- "train_pa2a_regr"
- ).map { func =>
- // Build a model
- val model = {
- val groupId = s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}"
- val res = invokeFunc(new HivemallOps(trainA9aData.part_amplify(1)), func,
- Seq[Column](add_bias($"features"), $"label",
- s"-mix localhost:${assignedPort} -mix_session ${groupId} -mix_threshold 2 -mix_cancel"))
- if (!res.columns.contains("conv")) {
- res.groupby("feature").agg("weight" -> "avg")
- } else {
- res.groupby("feature").argmin_kld("weight", "conv")
- }
- }.as("feature", "weight")
-
- // Data preparation
- val testDf = testA9aData
- .select(rowid(), $"label".as("target"), $"features")
- .cache
-
- val testDf_exploded = testDf
- .explode_array($"features")
- .select($"rowid", extract_feature($"feature"), extract_weight($"feature"))
-
- // Do prediction
- val predict = testDf_exploded
- .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
- .select($"rowid", ($"weight" * $"value").as("value"))
- .groupby("rowid").sum("value")
- .as("rowid", "predicted")
-
- // Evaluation
- val eval = predict
- .join(testDf, predict("rowid") === testDf("rowid"))
- .groupby()
- .agg(Map("target" -> "avg", "predicted" -> "avg"))
- .as("target", "predicted")
-
- val (target, predicted) = eval.map {
- case Row(target: Double, predicted: Double) => (target, predicted)
- }.first
-
- // scalastyle:off println
- println(s"func:${func} target:${target} predicted:${predicted} "
- + s"diff:${Math.abs(target - predicted)}")
-
- testDf.unpersist()
- }
- }
-
- benchmark("model mixing test w/ binary classification") {
- Seq(
- "train_perceptron",
- "train_pa",
- "train_pa1",
- "train_pa2",
- "train_cw",
- "train_arow",
- "train_arowh",
- "train_scw",
- "train_scw2",
- "train_adagrad_rda"
- ).map { func =>
- // Build a model
- val model = {
- val groupId = s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}"
- val res = invokeFunc(new HivemallOps(trainKdd2010aData.part_amplify(1)), func,
- Seq[Column](add_bias($"features"), $"label",
- s"-mix localhost:${assignedPort} -mix_session ${groupId} -mix_threshold 2 -mix_cancel"))
- if (!res.columns.contains("conv")) {
- res.groupby("feature").agg("weight" -> "avg")
- } else {
- res.groupby("feature").argmin_kld("weight", "conv")
- }
- }.as("feature", "weight")
-
- // Data preparation
- val testDf = testKdd2010aData
- .select(rowid(), $"label".as("target"), $"features")
- .cache
-
- val testDf_exploded = testDf
- .explode_array($"features")
- .select($"rowid", extract_feature($"feature"), extract_weight($"feature"))
-
- // Do prediction
- val predict = testDf_exploded
- .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
- .select($"rowid", ($"weight" * $"value").as("value"))
- .groupby("rowid").sum("value")
- .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0))
- .as("rowid", "predicted")
-
- // Evaluation
- val eval = predict
- .join(testDf, predict("rowid") === testDf("rowid"))
- .where($"target" === $"predicted")
-
- // scalastyle:off println
- println(s"func:${func} precision:${(eval.count + 0.0) / predict.count}")
-
- testDf.unpersist()
- predict.unpersist()
- }
- }
-}
-
-object FileIterator {
-
- def apply[A](f: => A): Iterator[A] = new Iterator[A] {
- var opt = Option(f)
- def hasNext = opt.nonEmpty
- def next() = {
- val r = opt.get
- opt = Option(f)
- r
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/streaming/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/streaming/HivemallOpsSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/streaming/HivemallOpsSuite.scala
deleted file mode 100644
index 5cc590a..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/streaming/HivemallOpsSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.streaming
-
-import reflect.ClassTag
-
-import org.apache.spark.ml.feature.HivemallLabeledPoint
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.streaming.HivemallStreamingOps._
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.scheduler.StreamInputInfo
-import org.apache.spark.test.HivemallQueryTest
-
-/**
- * This is an input stream just for tests.
- */
-private[this] class TestInputStream[T: ClassTag](
- ssc: StreamingContext,
- input: Seq[Seq[T]],
- numPartitions: Int) extends InputDStream[T](ssc) {
-
- override def start() {}
-
- override def stop() {}
-
- override def compute(validTime: Time): Option[RDD[T]] = {
- logInfo("Computing RDD for time " + validTime)
- val index = ((validTime - zeroTime) / slideDuration - 1).toInt
- val selectedInput = if (index < input.size) input(index) else Seq[T]()
-
- // lets us test cases where RDDs are not created
- if (selectedInput == null) {
- return None
- }
-
- // Report the input data's information to InputInfoTracker for testing
- val inputInfo = StreamInputInfo(id, selectedInput.length.toLong)
- ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
-
- val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
- logInfo("Created RDD " + rdd.id + " with " + selectedInput)
- Some(rdd)
- }
-}
-
-final class HivemallOpsSuite extends HivemallQueryTest {
-
- // This implicit value used in `HivemallStreamingOps`
- implicit val sqlCtx = sqlContext
-
- /**
- * Run a block of code with the given StreamingContext.
- * This method do not stop a given SparkContext because other tests share the context.
- */
- private def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): Unit = {
- try {
- block(ssc)
- ssc.start()
- ssc.awaitTerminationOrTimeout(10 * 1000) // 10s wait
- } finally {
- try {
- ssc.stop(stopSparkContext = false)
- } catch {
- case e: Exception => logError("Error stopping StreamingContext", e)
- }
- }
- }
-
- test("streaming") {
- import sqlCtx.implicits._
-
- // We assume we build a model in advance
- val testModel = Seq(
- ("0", 0.3f), ("1", 0.1f), ("2", 0.6f), ("3", 0.2f)
- ).toDF("feature", "weight")
-
- withStreamingContext(new StreamingContext(sqlCtx.sparkContext, Milliseconds(100))) { ssc =>
- val inputData = Seq(
- Seq(HivemallLabeledPoint(features = "1:0.6" :: "2:0.1" :: Nil)),
- Seq(HivemallLabeledPoint(features = "2:0.9" :: Nil)),
- Seq(HivemallLabeledPoint(features = "1:0.2" :: Nil)),
- Seq(HivemallLabeledPoint(features = "2:0.1" :: Nil)),
- Seq(HivemallLabeledPoint(features = "0:0.6" :: "2:0.4" :: Nil))
- )
-
- val inputStream = new TestInputStream[HivemallLabeledPoint](ssc, inputData, 1)
-
- // Apply predictions on input streams
- val prediction = inputStream.predict { streamDf =>
- val df = streamDf.select(rowid(), $"features").explode_array($"features")
- val testDf = df.select(
- // TODO: `$"feature"` throws AnalysisException, why?
- $"rowid", extract_feature(df("feature")), extract_weight(df("feature"))
- )
- testDf.join(testModel, testDf("feature") === testModel("feature"), "LEFT_OUTER")
- .select($"rowid", ($"weight" * $"value").as("value"))
- .groupby("rowid").sum("value")
- .as("rowid", "value")
- .select($"rowid", sigmoid($"value"))
- }
-
- // Dummy output stream
- prediction.foreachRDD(_ => {})
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/test/HivemallQueryTest.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/test/HivemallQueryTest.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/test/HivemallQueryTest.scala
deleted file mode 100644
index 020cf8c..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/test/HivemallQueryTest.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.test
-
-import scala.collection.mutable.Seq
-
-import hivemall.tools.RegressionDatagen
-
-import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.functions
-import org.apache.spark.sql.hive.HivemallOps
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.test.TestUtils._
-
-/**
- * Base class for a shared SparkContext in test uses
- */
-abstract class HivemallQueryTest extends QueryTest with TestHiveSingleton {
-
- import hiveContext.implicits._
-
- protected val DummyInputData =
- Seq(
- (0, 0), (1, 1), (2, 2), (3, 3)
- ).toDF("c0", "c1")
-
- protected val IntList2Data =
- Seq(
- (8 :: 5 :: Nil, 6 :: 4 :: Nil),
- (3 :: 1 :: Nil, 3 :: 2 :: Nil),
- (2 :: Nil, 3 :: Nil)
- ).toDF("target", "predict")
-
- protected val Float2Data =
- Seq(
- (0.8f, 0.3f), (0.3f, 0.9f), (0.2f, 0.4f)
- ).toDF("target", "predict")
-
- protected val TinyTrainData =
- Seq(
- (0.0, "1:0.8" :: "2:0.2" :: Nil),
- (1.0, "2:0.7" :: Nil),
- (0.0, "1:0.9" :: Nil)
- ).toDF("label", "features")
-
- protected val TinyTestData =
- Seq(
- (0.0, "1:0.6" :: "2:0.1" :: Nil),
- (1.0, "2:0.9" :: Nil),
- (0.0, "1:0.2" :: Nil),
- (0.0, "2:0.1" :: Nil),
- (0.0, "0:0.6" :: "2:0.4" :: Nil)
- ).toDF("label", "features")
-
- protected val LargeRegrTrainData = RegressionDatagen.exec(
- hiveContext,
- n_partitions = 2,
- min_examples = 100000,
- seed = 3,
- prob_one = 0.8f
- ).cache
-
- protected val LargeRegrTestData = RegressionDatagen.exec(
- hiveContext,
- n_partitions = 2,
- min_examples = 100,
- seed = 3,
- prob_one = 0.5f
- ).cache
-
- protected val LargeClassifierTrainData = RegressionDatagen.exec(
- hiveContext,
- n_partitions = 2,
- min_examples = 100000,
- seed = 5,
- prob_one = 0.8f,
- cl = true
- ).cache
-
- protected val LargeClassifierTestData = RegressionDatagen.exec(
- hiveContext,
- n_partitions = 2,
- min_examples = 100,
- seed = 5,
- prob_one = 0.5f,
- cl = true
- ).cache
-
- protected def checkRegrPrecision(func: String): Unit = {
- // Build a model
- val model = {
- val res = invokeFunc(new HivemallOps(LargeRegrTrainData),
- func, Seq(add_bias($"features"), $"label"))
- if (!res.columns.contains("conv")) {
- res.groupby("feature").agg("weight" -> "avg")
- } else {
- res.groupby("feature").argmin_kld("weight", "conv")
- }
- }.as("feature", "weight")
-
- // Data preparation
- val testDf = LargeRegrTrainData
- .select(rowid(), $"label".as("target"), $"features")
- .cache
-
- val testDf_exploded = testDf
- .explode_array($"features")
- .select($"rowid", extract_feature($"feature"), extract_weight($"feature"))
-
- // Do prediction
- val predict = testDf_exploded
- .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
- .select($"rowid", ($"weight" * $"value").as("value"))
- .groupby("rowid").sum("value")
- .as("rowid", "predicted")
-
- // Evaluation
- val eval = predict
- .join(testDf, predict("rowid") === testDf("rowid"))
- .groupby()
- .agg(Map("target" -> "avg", "predicted" -> "avg"))
- .as("target", "predicted")
-
- val diff = eval.map {
- case Row(target: Double, predicted: Double) =>
- Math.abs(target - predicted)
- }.first
-
- TestUtils.expectResult(diff > 0.10,
- s"Low precision -> func:${func} diff:${diff}")
- }
-
- protected def checkClassifierPrecision(func: String): Unit = {
- // Build a model
- val model = {
- val res = invokeFunc(new HivemallOps(LargeClassifierTrainData),
- func, Seq(add_bias($"features"), $"label"))
- if (!res.columns.contains("conv")) {
- res.groupby("feature").agg("weight" -> "avg")
- } else {
- res.groupby("feature").argmin_kld("weight", "conv")
- }
- }.as("feature", "weight")
-
- // Data preparation
- val testDf = LargeClassifierTestData
- .select(rowid(), $"label".as("target"), $"features")
- .cache
-
- val testDf_exploded = testDf
- .explode_array($"features")
- .select($"rowid", extract_feature($"feature"), extract_weight($"feature"))
-
- // Do prediction
- val predict = testDf_exploded
- .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
- .select($"rowid", ($"weight" * $"value").as("value"))
- .groupby("rowid").sum("value")
- /**
- * TODO: This sentence throws an exception below:
- *
- * WARN Column: Constructing trivially true equals predicate, 'rowid#1323 = rowid#1323'.
- * Perhaps you need to use aliases.
- */
- // .select($"rowid", functions.when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0)
- // .as("predicted"))
- .select($"rowid", functions.when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0))
- .as("rowid", "predicted")
-
- // Evaluation
- val eval = predict
- .join(testDf, predict("rowid") === testDf("rowid"))
- .where($"target" === $"predicted")
-
- val precision = (eval.count + 0.0) / predict.count
-
- TestUtils.expectResult(precision < 0.70,
- s"Low precision -> func:${func} value:${precision}")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/test/TestUtils.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/test/TestUtils.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/test/TestUtils.scala
deleted file mode 100644
index 9c77fae..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/test/TestUtils.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.test
-
-import scala.reflect.runtime.{universe => ru}
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.DataFrame
-
-object TestUtils extends Logging {
-
- // Do benchmark if INFO-log enabled
- def benchmark(benchName: String)(testFunc: => Unit): Unit = {
- if (log.isDebugEnabled) {
- testFunc
- }
- }
-
- def expectResult(res: Boolean, errMsg: String): Unit = if (res) {
- logWarning(errMsg)
- }
-
- def invokeFunc(cls: Any, func: String, args: Any*): DataFrame = try {
- // Invoke a function with the given name via reflection
- val im = scala.reflect.runtime.currentMirror.reflect(cls)
- val mSym = im.symbol.typeSignature.member(ru.newTermName(func)).asMethod
- im.reflectMethod(mSym).apply(args: _*)
- .asInstanceOf[DataFrame]
- } catch {
- case e: Exception =>
- assert(false, s"Invoking ${func} failed because: ${e.getMessage}")
- null // Not executed
- }
-}
-
-// TODO: Any same function in o.a.spark.*?
-class TestDoubleWrapper(d: Double) {
- // Check an equality between Double values
- def ~==(d: Double): Boolean = Math.abs(this.d - d) < 0.001
-}
-
-object TestDoubleWrapper {
- @inline implicit def toTestDoubleWrapper(d: Double): TestDoubleWrapper = {
- new TestDoubleWrapper(d)
- }
-}
[3/3] incubator-hivemall git commit: Close #67: [HIVEMALL-55][SPARK]
Drop the Spark-v1.6 support
Posted by my...@apache.org.
Close #67: [HIVEMALL-55][SPARK] Drop the Spark-v1.6 support
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/c53b9ff9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/c53b9ff9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/c53b9ff9
Branch: refs/heads/master
Commit: c53b9ff9b23755c7d211390ea97bc911a80674bf
Parents: 8dc3a02
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Mon Apr 10 06:43:20 2017 +0900
Committer: myui <yu...@gmail.com>
Committed: Mon Apr 10 06:43:20 2017 +0900
----------------------------------------------------------------------
.travis.yml | 3 +-
NOTICE | 1 -
bin/format_header.sh | 3 -
pom.xml | 11 -
spark/spark-1.6/bin/mvn-zinc | 99 --
spark/spark-1.6/extra-src/README | 1 -
.../org/apache/spark/sql/hive/HiveShim.scala | 527 --------
spark/spark-1.6/pom.xml | 261 ----
.../src/main/resources/log4j.properties | 12 -
.../hivemall/tools/RegressionDatagen.scala | 66 -
.../apache/spark/sql/hive/GroupedDataEx.scala | 303 -----
.../org/apache/spark/sql/hive/HivemallOps.scala | 1125 ------------------
.../apache/spark/sql/hive/HivemallUtils.scala | 112 --
.../src/test/resources/log4j.properties | 7 -
.../hivemall/mix/server/MixServerSuite.scala | 123 --
.../hivemall/tools/RegressionDatagenSuite.scala | 32 -
.../scala/org/apache/spark/SparkFunSuite.scala | 48 -
.../ml/feature/HivemallLabeledPointSuite.scala | 35 -
.../scala/org/apache/spark/sql/QueryTest.scala | 295 -----
.../spark/sql/catalyst/plans/PlanTest.scala | 61 -
.../apache/spark/sql/hive/HiveUdfSuite.scala | 113 --
.../spark/sql/hive/HivemallOpsSuite.scala | 665 -----------
.../spark/sql/hive/ModelMixingSuite.scala | 272 -----
.../spark/streaming/HivemallOpsSuite.scala | 123 --
.../apache/spark/test/HivemallQueryTest.scala | 197 ---
.../scala/org/apache/spark/test/TestUtils.scala | 62 -
26 files changed, 1 insertion(+), 4556 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a4cb7ea..323e36a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -35,9 +35,8 @@ notifications:
script:
- mvn -q scalastyle:check test -Pspark-2.1
- # test the spark-1.6/2.0 modules only in the following runs
+ # test the spark-2.0 modules only in the following runs
- mvn -q scalastyle:check clean -Pspark-2.0 -pl spark/spark-2.0 -am test -Dtest=none
- - mvn -q scalastyle:check clean -Pspark-1.6 -pl spark/spark-1.6 -am test -Dtest=none
after_success:
- mvn clean cobertura:cobertura coveralls:report
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 0911f50..79a944c 100644
--- a/NOTICE
+++ b/NOTICE
@@ -50,7 +50,6 @@ o hivemall/core/src/main/java/hivemall/utils/buffer/DynamicByteArray.java
https://orc.apache.org/
Licensed under the Apache License, Version 2.0
-o hivemall/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
hivemall/spark/spark-2.0/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
hivemall/spark/spark-2.0/src/test/scala/org/apache/spark/sql/QueryTest.scala
hivemall/spark/spark-2.0/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/bin/format_header.sh
----------------------------------------------------------------------
diff --git a/bin/format_header.sh b/bin/format_header.sh
index da67420..f2c063b 100755
--- a/bin/format_header.sh
+++ b/bin/format_header.sh
@@ -37,8 +37,5 @@ mvn license:format
cd $HIVEMALL_HOME/spark/spark-common
mvn license:format -P spark-2.0
-cd $HIVEMALL_HOME/spark/spark-1.6
-mvn license:format -P spark-1.6
-
cd $HIVEMALL_HOME/spark/spark-2.0
mvn license:format -P spark-2.0
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a2276e9..7743d5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -293,17 +293,6 @@
</properties>
</profile>
<profile>
- <id>spark-1.6</id>
- <modules>
- <module>spark/spark-1.6</module>
- <module>spark/spark-common</module>
- </modules>
- <properties>
- <spark.version>1.6.2</spark.version>
- <spark.binary.version>1.6</spark.binary.version>
- </properties>
- </profile>
- <profile>
<id>compile-xgboost</id>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/bin/mvn-zinc
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/bin/mvn-zinc b/spark/spark-1.6/bin/mvn-zinc
deleted file mode 100755
index 759b0a5..0000000
--- a/spark/spark-1.6/bin/mvn-zinc
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Copyed from commit 48682f6bf663e54cb63b7e95a4520d34b6fa890b in Apache Spark
-
-# Determine the current working directory
-_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
-# Preserve the calling directory
-_CALLING_DIR="$(pwd)"
-# Options used during compilation
-_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
-
-# Installs any application tarball given a URL, the expected tarball name,
-# and, optionally, a checkable binary path to determine if the binary has
-# already been installed
-## Arg1 - URL
-## Arg2 - Tarball Name
-## Arg3 - Checkable Binary
-install_app() {
- local remote_tarball="$1/$2"
- local local_tarball="${_DIR}/$2"
- local binary="${_DIR}/$3"
- local curl_opts="--progress-bar -L"
- local wget_opts="--progress=bar:force ${wget_opts}"
-
- if [ -z "$3" -o ! -f "$binary" ]; then
- # check if we already have the tarball
- # check if we have curl installed
- # download application
- [ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \
- echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \
- curl ${curl_opts} "${remote_tarball}" > "${local_tarball}"
- # if the file still doesn't exist, lets try `wget` and cross our fingers
- [ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \
- echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \
- wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}"
- # if both were unsuccessful, exit
- [ ! -f "${local_tarball}" ] && \
- echo -n "ERROR: Cannot download $2 with cURL or wget; " && \
- echo "please install manually and try again." && \
- exit 2
- cd "${_DIR}" && tar -xzf "$2"
- rm -rf "$local_tarball"
- fi
-}
-
-# Install zinc under the bin/ folder
-install_zinc() {
- local zinc_path="zinc-0.3.9/bin/zinc"
- [ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
- install_app \
- "http://downloads.typesafe.com/zinc/0.3.9" \
- "zinc-0.3.9.tgz" \
- "${zinc_path}"
- ZINC_BIN="${_DIR}/${zinc_path}"
-}
-
-# Setup healthy defaults for the Zinc port if none were provided from
-# the environment
-ZINC_PORT=${ZINC_PORT:-"3030"}
-
-# Install Zinc for the bin/
-install_zinc
-
-# Reset the current working directory
-cd "${_CALLING_DIR}"
-
-# Now that zinc is ensured to be installed, check its status and, if its
-# not running or just installed, start it
-if [ ! -f "${ZINC_BIN}" ]; then
- exit -1
-fi
-if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
- export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
- "${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
- "${ZINC_BIN}" -start -port ${ZINC_PORT} &>/dev/null
-fi
-
-# Set any `mvn` options if not already present
-export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
-
-# Last, call the `mvn` command as usual
-mvn -DzincPort=${ZINC_PORT} "$@"
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/extra-src/README
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/extra-src/README b/spark/spark-1.6/extra-src/README
deleted file mode 100644
index 8b5d0cd..0000000
--- a/spark/spark-1.6/extra-src/README
+++ /dev/null
@@ -1 +0,0 @@
-Copyed from spark master [commit 908e37bcc10132bb2aa7f80ae694a9df6e40f31a]
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
deleted file mode 100644
index 6da54f7..0000000
--- a/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive.client
-
-import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
-import java.lang.reflect.{Method, Modifier}
-import java.net.URI
-import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Driver
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
-import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{IntegralType, StringType}
-
-/**
- * A shim that defines the interface between ClientWrapper and the underlying Hive library used to
- * talk to the metastore. Each Hive version has its own implementation of this class, defining
- * version-specific version of needed functions.
- *
- * The guideline for writing shims is:
- * - always extend from the previous version unless really not possible
- * - initialize methods in lazy vals, both for quicker access for multiple invocations, and to
- * avoid runtime errors due to the above guideline.
- */
-private[client] sealed abstract class Shim {
-
- /**
- * Set the current SessionState to the given SessionState. Also, set the context classloader of
- * the current thread to the one set in the HiveConf of this given `state`.
- * @param state
- */
- def setCurrentSessionState(state: SessionState): Unit
-
- /**
- * This shim is necessary because the return type is different on different versions of Hive.
- * All parameters are the same, though.
- */
- def getDataLocation(table: Table): Option[String]
-
- def setDataLocation(table: Table, loc: String): Unit
-
- def getAllPartitions(hive: Hive, table: Table): Seq[Partition]
-
- def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition]
-
- def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
-
- def getDriverResults(driver: Driver): Seq[String]
-
- def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
-
- def loadPartition(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit
-
- def loadTable(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- replace: Boolean,
- holdDDLTime: Boolean): Unit
-
- def loadDynamicPartitions(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- numDP: Int,
- holdDDLTime: Boolean,
- listBucketingEnabled: Boolean): Unit
-
- def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
-
- protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
- val method = findMethod(klass, name, args: _*)
- require(Modifier.isStatic(method.getModifiers()),
- s"Method $name of class $klass is not static.")
- method
- }
-
- protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
- klass.getMethod(name, args: _*)
- }
-
-}
-
-private[client] class Shim_v0_12 extends Shim with Logging {
-
- private lazy val startMethod =
- findStaticMethod(
- classOf[SessionState],
- "start",
- classOf[SessionState])
- private lazy val getDataLocationMethod = findMethod(classOf[Table], "getDataLocation")
- private lazy val setDataLocationMethod =
- findMethod(
- classOf[Table],
- "setDataLocation",
- classOf[URI])
- private lazy val getAllPartitionsMethod =
- findMethod(
- classOf[Hive],
- "getAllPartitionsForPruner",
- classOf[Table])
- private lazy val getCommandProcessorMethod =
- findStaticMethod(
- classOf[CommandProcessorFactory],
- "get",
- classOf[String],
- classOf[HiveConf])
- private lazy val getDriverResultsMethod =
- findMethod(
- classOf[Driver],
- "getResults",
- classOf[JArrayList[String]])
- private lazy val loadPartitionMethod =
- findMethod(
- classOf[Hive],
- "loadPartition",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val loadTableMethod =
- findMethod(
- classOf[Hive],
- "loadTable",
- classOf[Path],
- classOf[String],
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val loadDynamicPartitionsMethod =
- findMethod(
- classOf[Hive],
- "loadDynamicPartitions",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JInteger.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val dropIndexMethod =
- findMethod(
- classOf[Hive],
- "dropIndex",
- classOf[String],
- classOf[String],
- classOf[String],
- JBoolean.TYPE)
-
- override def setCurrentSessionState(state: SessionState): Unit = {
- // Starting from Hive 0.13, setCurrentSessionState will internally override
- // the context class loader of the current thread by the class loader set in
- // the conf of the SessionState. So, for this Hive 0.12 shim, we add the same
- // behavior and make shim.setCurrentSessionState of all Hive versions have the
- // consistent behavior.
- Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader)
- startMethod.invoke(null, state)
- }
-
- override def getDataLocation(table: Table): Option[String] =
- Option(getDataLocationMethod.invoke(table)).map(_.toString())
-
- override def setDataLocation(table: Table, loc: String): Unit =
- setDataLocationMethod.invoke(table, new URI(loc))
-
- override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
- getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
-
- override def getPartitionsByFilter(
- hive: Hive,
- table: Table,
- predicates: Seq[Expression]): Seq[Partition] = {
- // getPartitionsByFilter() doesn't support binary comparison ops in Hive 0.12.
- // See HIVE-4888.
- logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
- "Please use Hive 0.13 or higher.")
- getAllPartitions(hive, table)
- }
-
- override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
- getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor]
-
- override def getDriverResults(driver: Driver): Seq[String] = {
- val res = new JArrayList[String]()
- getDriverResultsMethod.invoke(driver, res)
- res.asScala
- }
-
- override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
- conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000
- }
-
- override def loadPartition(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit = {
- loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)
- }
-
- override def loadTable(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- replace: Boolean,
- holdDDLTime: Boolean): Unit = {
- loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean)
- }
-
- override def loadDynamicPartitions(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- numDP: Int,
- holdDDLTime: Boolean,
- listBucketingEnabled: Boolean): Unit = {
- loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean)
- }
-
- override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
- dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
- }
-
-}
-
-private[client] class Shim_v0_13 extends Shim_v0_12 {
-
- private lazy val setCurrentSessionStateMethod =
- findStaticMethod(
- classOf[SessionState],
- "setCurrentSessionState",
- classOf[SessionState])
- private lazy val setDataLocationMethod =
- findMethod(
- classOf[Table],
- "setDataLocation",
- classOf[Path])
- private lazy val getAllPartitionsMethod =
- findMethod(
- classOf[Hive],
- "getAllPartitionsOf",
- classOf[Table])
- private lazy val getPartitionsByFilterMethod =
- findMethod(
- classOf[Hive],
- "getPartitionsByFilter",
- classOf[Table],
- classOf[String])
- private lazy val getCommandProcessorMethod =
- findStaticMethod(
- classOf[CommandProcessorFactory],
- "get",
- classOf[Array[String]],
- classOf[HiveConf])
- private lazy val getDriverResultsMethod =
- findMethod(
- classOf[Driver],
- "getResults",
- classOf[JList[Object]])
-
- override def setCurrentSessionState(state: SessionState): Unit =
- setCurrentSessionStateMethod.invoke(null, state)
-
- override def setDataLocation(table: Table, loc: String): Unit =
- setDataLocationMethod.invoke(table, new Path(loc))
-
- override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
- getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
-
- /**
- * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
- * a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
- *
- * Unsupported predicates are skipped.
- */
- def convertFilters(table: Table, filters: Seq[Expression]): String = {
- // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
- val varcharKeys = table.getPartitionKeys.asScala
- .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
- col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
- .map(col => col.getName).toSet
-
- filters.collect {
- case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
- s"${a.name} ${op.symbol} $v"
- case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
- s"$v ${op.symbol} ${a.name}"
- case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
- if !varcharKeys.contains(a.name) =>
- s"""${a.name} ${op.symbol} "$v""""
- case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
- if !varcharKeys.contains(a.name) =>
- s""""$v" ${op.symbol} ${a.name}"""
- }.mkString(" and ")
- }
-
- override def getPartitionsByFilter(
- hive: Hive,
- table: Table,
- predicates: Seq[Expression]): Seq[Partition] = {
-
- // Hive getPartitionsByFilter() takes a string that represents partition
- // predicates like "str_key=\"value\" and int_key=1 ..."
- val filter = convertFilters(table, predicates)
- val partitions =
- if (filter.isEmpty) {
- getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
- } else {
- logDebug(s"Hive metastore filter is '$filter'.")
- getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]]
- }
-
- partitions.asScala.toSeq
- }
-
- override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
- getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor]
-
- override def getDriverResults(driver: Driver): Seq[String] = {
- val res = new JArrayList[Object]()
- getDriverResultsMethod.invoke(driver, res)
- res.asScala.map { r =>
- r match {
- case s: String => s
- case a: Array[Object] => a(0).asInstanceOf[String]
- }
- }
- }
-
-}
-
-private[client] class Shim_v0_14 extends Shim_v0_13 {
-
- private lazy val loadPartitionMethod =
- findMethod(
- classOf[Hive],
- "loadPartition",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val loadTableMethod =
- findMethod(
- classOf[Hive],
- "loadTable",
- classOf[Path],
- classOf[String],
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val loadDynamicPartitionsMethod =
- findMethod(
- classOf[Hive],
- "loadDynamicPartitions",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JInteger.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE)
- private lazy val getTimeVarMethod =
- findMethod(
- classOf[HiveConf],
- "getTimeVar",
- classOf[HiveConf.ConfVars],
- classOf[TimeUnit])
-
- override def loadPartition(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit = {
- loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
- isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE)
- }
-
- override def loadTable(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- replace: Boolean,
- holdDDLTime: Boolean): Unit = {
- loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean,
- isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, JBoolean.FALSE)
- }
-
- override def loadDynamicPartitions(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- numDP: Int,
- holdDDLTime: Boolean,
- listBucketingEnabled: Boolean): Unit = {
- loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
- }
-
- override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
- getTimeVarMethod.invoke(
- conf,
- HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
- TimeUnit.MILLISECONDS).asInstanceOf[Long]
- }
-
- protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = {
- val localFs = FileSystem.getLocal(conf)
- val pathFs = FileSystem.get(path.toUri(), conf)
- localFs.getUri() == pathFs.getUri()
- }
-
-}
-
-private[client] class Shim_v1_0 extends Shim_v0_14 {
-
-}
-
-private[client] class Shim_v1_1 extends Shim_v1_0 {
-
- private lazy val dropIndexMethod =
- findMethod(
- classOf[Hive],
- "dropIndex",
- classOf[String],
- classOf[String],
- classOf[String],
- JBoolean.TYPE,
- JBoolean.TYPE)
-
- override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
- dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean, true: JBoolean)
- }
-
-}
-
-private[client] class Shim_v1_2 extends Shim_v1_1 {
-
- private lazy val loadDynamicPartitionsMethod =
- findMethod(
- classOf[Hive],
- "loadDynamicPartitions",
- classOf[Path],
- classOf[String],
- classOf[JMap[String, String]],
- JBoolean.TYPE,
- JInteger.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JBoolean.TYPE,
- JLong.TYPE)
-
- override def loadDynamicPartitions(
- hive: Hive,
- loadPath: Path,
- tableName: String,
- partSpec: JMap[String, String],
- replace: Boolean,
- numDP: Int,
- holdDDLTime: Boolean,
- listBucketingEnabled: Boolean): Unit = {
- loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE,
- 0L: JLong)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/pom.xml
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/pom.xml b/spark/spark-1.6/pom.xml
deleted file mode 100644
index 98bff6b..0000000
--- a/spark/spark-1.6/pom.xml
+++ /dev/null
@@ -1,261 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>io.github.myui</groupId>
- <artifactId>hivemall</artifactId>
- <version>0.4.2-rc.2</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>hivemall-spark</artifactId>
- <name>Hivemall on Spark 1.6</name>
- <packaging>jar</packaging>
-
- <properties>
- <PermGen>64m</PermGen>
- <MaxPermGen>512m</MaxPermGen>
- <CodeCacheSize>512m</CodeCacheSize>
- <main.basedir>${project.parent.basedir}</main.basedir>
- </properties>
-
- <dependencies>
- <!-- hivemall dependencies -->
- <dependency>
- <groupId>io.github.myui</groupId>
- <artifactId>hivemall-core</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>io.github.myui</groupId>
- <artifactId>hivemall-spark-common</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
-
- <!-- third-party dependencies -->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- <version>1.8</version>
- <scope>compile</scope>
- </dependency>
-
- <!-- other provided dependencies -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- test dependencies -->
- <dependency>
- <groupId>io.github.myui</groupId>
- <artifactId>hivemall-mixserv</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.xerial</groupId>
- <artifactId>xerial-core</artifactId>
- <version>3.2.3</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.binary.version}</artifactId>
- <version>2.2.4</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <directory>target</directory>
- <outputDirectory>target/classes</outputDirectory>
- <finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}</finalName>
- <testOutputDirectory>target/test-classes</testOutputDirectory>
- <plugins>
- <!-- For incremental compilation -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile-first</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <scalaVersion>${scala.version}</scalaVersion>
- <recompileMode>incremental</recompileMode>
- <useZincServer>true</useZincServer>
- <args>
- <arg>-unchecked</arg>
- <arg>-deprecation</arg>
- <!-- TODO: To enable this option, we need to fix many wornings -->
- <!-- <arg>-feature</arg> -->
- </args>
- <jvmArgs>
- <jvmArg>-Xms1024m</jvmArg>
- <jvmArg>-Xmx1024m</jvmArg>
- <jvmArg>-XX:PermSize=${PermGen}</jvmArg>
- <jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg>
- <jvmArg>-XX:ReservedCodeCacheSize=${CodeCacheSize}</jvmArg>
- </jvmArgs>
- </configuration>
- </plugin>
- <!-- hivemall-spark_xx-xx.jar -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.5</version>
- <configuration>
- <finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}</finalName>
- <outputDirectory>${project.parent.build.directory}</outputDirectory>
- </configuration>
- </plugin>
- <!-- hivemall-spark_xx-xx-with-dependencies.jar including minimum dependencies -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.3</version>
- <executions>
- <execution>
- <id>jar-with-dependencies</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}-with-dependencies</finalName>
- <outputDirectory>${project.parent.build.directory}</outputDirectory>
- <minimizeJar>false</minimizeJar>
- <createDependencyReducedPom>false</createDependencyReducedPom>
- <artifactSet>
- <includes>
- <include>io.github.myui:hivemall-core</include>
- <include>io.github.myui:hivemall-spark-common</include>
- <include>com.github.haifengl:smile-core</include>
- <include>com.github.haifengl:smile-math</include>
- <include>com.github.haifengl:smile-data</include>
- </includes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- disable surefire because there is no java test -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.7</version>
- <configuration>
- <skipTests>true</skipTests>
- </configuration>
- </plugin>
- <!-- then, enable scalatest -->
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0</version>
- <!-- Note config is repeated in surefire config -->
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>SparkTestSuite.txt</filereports>
- <argLine>-ea -Xmx2g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
- <stderr/>
- <environmentVariables>
- <SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES>
- <SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION>
- <SPARK_TESTING>1</SPARK_TESTING>
- <JAVA_HOME>${env.JAVA_HOME}</JAVA_HOME>
- </environmentVariables>
- <systemProperties>
- <log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
- <derby.system.durability>test</derby.system.durability>
- <java.awt.headless>true</java.awt.headless>
- <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
- <spark.testing>1</spark.testing>
- <spark.ui.enabled>false</spark.ui.enabled>
- <spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
- <spark.unsafe.exceptionOnMemoryLeak>true</spark.unsafe.exceptionOnMemoryLeak>
- <!-- Needed by sql/hive tests. -->
- <test.src.tables>__not_used__</test.src.tables>
- </systemProperties>
- <tagsToExclude>${test.exclude.tags}</tagsToExclude>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/resources/log4j.properties b/spark/spark-1.6/src/main/resources/log4j.properties
deleted file mode 100644
index 72bf5b6..0000000
--- a/spark/spark-1.6/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,12 +0,0 @@
-# Set everything to be logged to the console
-log4j.rootCategory=INFO, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.eclipse.jetty=INFO
-log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
-log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
-log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala b/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala
deleted file mode 100644
index 01664f4..0000000
--- a/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package hivemall.tools
-
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
-import org.apache.spark.sql.types._
-
-object RegressionDatagen {
-
- /**
- * Generate data for regression/classification.
- * See [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]]
- * for the details of arguments below.
- */
- def exec(sc: SQLContext,
- n_partitions: Int = 2,
- min_examples: Int = 1000,
- n_features: Int = 10,
- n_dims: Int = 200,
- seed: Int = 43,
- dense: Boolean = false,
- prob_one: Float = 0.6f,
- sort: Boolean = false,
- cl: Boolean = false): DataFrame = {
-
- require(n_partitions > 0, "Non-negative #n_partitions required.")
- require(min_examples > 0, "Non-negative #min_examples required.")
- require(n_features > 0, "Non-negative #n_features required.")
- require(n_dims > 0, "Non-negative #n_dims required.")
-
- // Calculate #examples to generate in each partition
- val n_examples = (min_examples + n_partitions - 1) / n_partitions
-
- val df = sc.createDataFrame(
- sc.sparkContext.parallelize((0 until n_partitions).map(Row(_)), n_partitions),
- StructType(
- StructField("data", IntegerType, true) ::
- Nil)
- )
- import sc.implicits._
- df.lr_datagen(
- s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims -prob_one $prob_one"
- + (if (dense) " -dense" else "")
- + (if (sort) " -sort" else "")
- + (if (cl) " -cl" else ""))
- .select($"label".cast(DoubleType).as("label"), $"features")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
deleted file mode 100644
index 18ef9df..0000000
--- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.{AnalysisException, DataFrame, GroupedData}
-import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Cube, Pivot, Rollup}
-import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
-import org.apache.spark.sql.types._
-
-final class GroupedDataEx protected[sql](
- df: DataFrame,
- groupingExprs: Seq[Expression],
- private val groupType: GroupedData.GroupType)
- extends GroupedData(df, groupingExprs, groupType) {
-
- //////////////////////////////////////////////////////////////////////////////////////////////
- // toDF, alias, and strToExpr are copyed from the base class, GroupedData, because
- // these methods have 'private[this]' modifiers.
- //////////////////////////////////////////////////////////////////////////////////////////////
-
- private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
- val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
- groupingExprs ++ aggExprs
- } else {
- aggExprs
- }
-
- val aliasedAgg = aggregates.map(alias)
-
- groupType match {
- case GroupedData.GroupByType =>
- DataFrame(
- df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
- case GroupedData.RollupType =>
- DataFrame(
- df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aliasedAgg))
- case GroupedData.CubeType =>
- DataFrame(
- df.sqlContext, Cube(groupingExprs, df.logicalPlan, aliasedAgg))
- case GroupedData.PivotType(pivotCol, values) =>
- val aliasedGrps = groupingExprs.map(alias)
- DataFrame(
- df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
- }
- }
-
- private[this] def alias(expr: Expression): NamedExpression = expr match {
- case u: UnresolvedAttribute => UnresolvedAlias(u)
- case expr: NamedExpression => expr
- case expr: Expression => Alias(expr, expr.prettyString)()
- }
-
- private[this] def strToExpr(expr: String): (Expression => Expression) = {
- val exprToFunc: (Expression => Expression) = {
- (inputExpr: Expression) => expr.toLowerCase match {
- case "avg" | "average" | "mean" =>
- UnresolvedFunction("avg", inputExpr :: Nil, isDistinct = false)
- case "stddev" | "std" =>
- UnresolvedFunction("stddev", inputExpr :: Nil, isDistinct = false)
- // Also special handle count because we need to take care count(*).
- case "count" | "size" =>
- // Turn count(*) into count(1)
- inputExpr match {
- case s: Star => Count(Literal(1)).toAggregateExpression()
- case _ => Count(inputExpr).toAggregateExpression()
- }
- case name => UnresolvedFunction(name, inputExpr :: Nil, isDistinct = false)
- }
- }
- (inputExpr: Expression) => exprToFunc(inputExpr)
- }
-
- //////////////////////////////////////////////////////////////////////////////////////////////
- //////////////////////////////////////////////////////////////////////////////////////////////
-
- // `agg` only supports aggregation functions with a single argument
- override def agg(exprs: Map[String, String]): DataFrame = {
- toDF(exprs.map { case (colName, expr) =>
- val a = expr match {
- case "voted_avg" =>
- HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.ensemble.bagging.VotedAvgUDAF"),
- Seq(df.col(colName).expr),
- isUDAFBridgeRequired = true
- ).toAggregateExpression
- case "weight_voted_avg" =>
- HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
- Seq(df.col(colName).expr),
- isUDAFBridgeRequired = true
- ).toAggregateExpression()
- case "rf_ensemble" =>
- HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"),
- Seq(df.col(colName).expr),
- isUDAFBridgeRequired = true
- ).toAggregateExpression()
- case _ =>
- strToExpr(expr)(df(colName).expr)
- }
- Alias(a, a.prettyString)()
- }.toSeq)
- }
-
- private[this] def checkType(colName: String, expected: DataType) = {
- val dataType = df.resolve(colName).dataType
- if (dataType != expected) {
- throw new AnalysisException(
- s""""$colName" must be $expected, however it is $dataType""")
- }
- }
-
- /**
- * @see hivemall.ensemble.bagging.VotedAvgUDAF
- */
- def voted_avg(weight: String): DataFrame = {
- // checkType(weight, NumericType)
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
- Seq(weight).map(df.col(_).expr),
- isUDAFBridgeRequired = true)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.ensemble.bagging.WeightVotedAvgUDAF
- */
- def weight_voted_avg(weight: String): DataFrame = {
- // checkType(weight, NumericType)
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
- Seq(weight).map(df.col(_).expr),
- isUDAFBridgeRequired = true)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.ensemble.ArgminKLDistanceUDAF
- */
- def argmin_kld(weight: String, conv: String): DataFrame = {
- // checkType(weight, NumericType)
- // checkType(conv, NumericType)
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.ensemble.ArgminKLDistanceUDAF"),
- Seq(weight, conv).map(df.col(_).expr),
- isUDAFBridgeRequired = true)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.ensemble.MaxValueLabelUDAF
- */
- def max_label(score: String, label: String): DataFrame = {
- // checkType(score, NumericType)
- checkType(label, StringType)
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.ensemble.MaxValueLabelUDAF"),
- Seq(score, label).map(df.col(_).expr),
- isUDAFBridgeRequired = true)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.ensemble.MaxRowUDAF
- */
- def maxrow(score: String, label: String): DataFrame = {
- // checkType(score, NumericType)
- checkType(label, StringType)
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.ensemble.MaxRowUDAF"),
- Seq(score, label).map(df.col(_).expr),
- isUDAFBridgeRequired = false)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.smile.tools.RandomForestEnsembleUDAF
- */
- def rf_ensemble(predict: String): DataFrame = {
- // checkType(predict, NumericType)
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"),
- Seq(predict).map(df.col(_).expr),
- isUDAFBridgeRequired = false)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.evaluation.MeanAbsoluteErrorUDAF
- */
- def mae(predict: String, target: String): DataFrame = {
- checkType(predict, FloatType)
- checkType(target, FloatType)
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.evaluation.MeanAbsoluteErrorUDAF"),
- Seq(predict, target).map(df.col(_).expr),
- isUDAFBridgeRequired = true)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.evaluation.MeanSquareErrorUDAF
- */
- def mse(predict: String, target: String): DataFrame = {
- checkType(predict, FloatType)
- checkType(target, FloatType)
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.evaluation.MeanSquaredErrorUDAF"),
- Seq(predict, target).map(df.col(_).expr),
- isUDAFBridgeRequired = true)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.evaluation.RootMeanSquareErrorUDAF
- */
- def rmse(predict: String, target: String): DataFrame = {
- checkType(predict, FloatType)
- checkType(target, FloatType)
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.evaluation.RootMeanSquaredErrorUDAF"),
- Seq(predict, target).map(df.col(_).expr),
- isUDAFBridgeRequired = true)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.evaluation.FMeasureUDAF
- */
- def f1score(predict: String, target: String): DataFrame = {
- // checkType(target, ArrayType(IntegerType))
- // checkType(predict, ArrayType(IntegerType))
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.evaluation.FMeasureUDAF"),
- Seq(predict, target).map(df.col(_).expr),
- isUDAFBridgeRequired = true)
- .toAggregateExpression()
- toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
- }
-
- /**
- * @see hivemall.ftvec.trans.OnehotEncodingUDAF
- */
- def onehot_encoding(features: String*): DataFrame = {
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.ftvec.trans.OnehotEncodingUDAF"),
- features.map(df.col(_).expr),
- isUDAFBridgeRequired = false)
- toDF(Seq(Alias(udaf, udaf.prettyString)()))
- }
-
- /**
- * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF
- */
- def snr(X: String, Y: String): DataFrame = {
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"),
- Seq(X, Y).map(df.col(_).expr),
- isUDAFBridgeRequired = false)
- .toAggregateExpression()
- toDF(Seq(Alias(udaf, udaf.prettyString)()))
- }
-
- /**
- * @see hivemall.tools.matrix.TransposeAndDotUDAF
- */
- def transpose_and_dot(X: String, Y: String): DataFrame = {
- val udaf = HiveUDAFFunction(
- new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"),
- Seq(X, Y).map(df.col(_).expr),
- isUDAFBridgeRequired = false)
- .toAggregateExpression()
- toDF(Seq(Alias(udaf, udaf.prettyString)()))
- }
-}
[2/3] incubator-hivemall git commit: Close #67: [HIVEMALL-55][SPARK]
Drop the Spark-v1.6 support
Posted by my...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
deleted file mode 100644
index 8583e1c..0000000
--- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
+++ /dev/null
@@ -1,1125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import java.util.UUID
-
-import org.apache.spark.Logging
-import org.apache.spark.ml.feature.HivemallFeature
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
-import org.apache.spark.sql.types._
-
-/**
- * Hivemall wrapper and some utility functions for DataFrame.
- *
- * @groupname regression
- * @groupname classifier
- * @groupname classifier.multiclass
- * @groupname ensemble
- * @groupname knn.similarity
- * @groupname knn.distance
- * @groupname knn.lsh
- * @groupname ftvec
- * @groupname ftvec.amplify
- * @groupname ftvec.hashing
- * @groupname ftvec.scaling
- * @groupname ftvec.conv
- * @groupname ftvec.trans
- * @groupname misc
- */
-final class HivemallOps(df: DataFrame) extends Logging {
-
- /**
- * An implicit conversion to avoid doing annoying transformation.
- */
- @inline
- private[this] implicit def toDataFrame(logicalPlan: LogicalPlan) =
- DataFrame(df.sqlContext, logicalPlan)
-
- /**
- * @see hivemall.regression.AdaDeltaUDTF
- * @group regression
- */
- @scala.annotation.varargs
- def train_adadelta(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.AdaDeltaUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.regression.AdaGradUDTF
- * @group regression
- */
- @scala.annotation.varargs
- def train_adagrad(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.AdaGradUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.regression.AROWRegressionUDTF
- * @group regression
- */
- @scala.annotation.varargs
- def train_arow_regr(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.regression.AROWRegressionUDTF$AROWe
- * @group regression
- */
- @scala.annotation.varargs
- def train_arowe_regr(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.regression.AROWRegressionUDTF$AROWe2
- * @group regression
- */
- @scala.annotation.varargs
- def train_arowe2_regr(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe2"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.regression.LogressUDTF
- * @group regression
- */
- @scala.annotation.varargs
- def train_logregr(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.LogressUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.regression.PassiveAggressiveRegressionUDTF
- * @group regression
- */
- @scala.annotation.varargs
- def train_pa1_regr(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a
- * @group regression
- */
- @scala.annotation.varargs
- def train_pa1a_regr(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2
- * @group regression
- */
- @scala.annotation.varargs
- def train_pa2_regr(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a
- * @group regression
- */
- @scala.annotation.varargs
- def train_pa2a_regr(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.smile.regression.RandomForestRegressionUDTF
- * @group regression
- */
- @scala.annotation.varargs
- def train_randomforest_regr(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.smile.regression.RandomForestRegressionUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
- .map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.PerceptronUDTF
- * @group classifier
- */
- @scala.annotation.varargs
- def train_perceptron(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.PerceptronUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.PassiveAggressiveUDTF
- * @group classifier
- */
- @scala.annotation.varargs
- def train_pa(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.PassiveAggressiveUDTF$PA1
- * @group classifier
- */
- @scala.annotation.varargs
- def train_pa1(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA1"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.PassiveAggressiveUDTF$PA2
- * @group classifier
- */
- @scala.annotation.varargs
- def train_pa2(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA2"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.ConfidenceWeightedUDTF
- * @group classifier
- */
- @scala.annotation.varargs
- def train_cw(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.ConfidenceWeightedUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.AROWClassifierUDTF
- * @group classifier
- */
- @scala.annotation.varargs
- def train_arow(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.AROWClassifierUDTF$AROWh
- * @group classifier
- */
- @scala.annotation.varargs
- def train_arowh(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF$AROWh"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1
- * @group classifier
- */
- @scala.annotation.varargs
- def train_scw(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW1"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1
- * @group classifier
- */
- @scala.annotation.varargs
- def train_scw2(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW2"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.classifier.AdaGradRDAUDTF
- * @group classifier
- */
- @scala.annotation.varargs
- def train_adagrad_rda(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.AdaGradRDAUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.smile.classification.RandomForestClassifierUDTF
- * @group classifier
- */
- @scala.annotation.varargs
- def train_randomforest_classifier(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.smile.classification.RandomForestClassifierUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
- .map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.classifier.MulticlassPerceptronUDTF
- * @group classifier.multiclass
- */
- @scala.annotation.varargs
- def train_multiclass_perceptron(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPerceptronUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.classifier.PassiveAggressiveUDTF
- * @group classifier.multiclass
- */
- @scala.annotation.varargs
- def train_multiclass_pa(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA1
- * @group classifier.multiclass
- */
- @scala.annotation.varargs
- def train_multiclass_pa1(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper(
- "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA2
- * @group classifier.multiclass
- */
- @scala.annotation.varargs
- def train_multiclass_pa2(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper(
- "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.classifier.MulticlassConfidenceWeightedUDTF
- * @group classifier.multiclass
- */
- @scala.annotation.varargs
- def train_multiclass_cw(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.classifier.MulticlassAROWClassifierUDTF
- * @group classifier.multiclass
- */
- @scala.annotation.varargs
- def train_multiclass_arow(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW1
- * @group classifier.multiclass
- */
- @scala.annotation.varargs
- def train_multiclass_scw(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper(
- "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW2
- * @group classifier.multiclass
- */
- @scala.annotation.varargs
- def train_multiclass_scw2(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper(
- "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2"),
- setMixServs(exprs: _*).map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
- * See [[GroupedDataEx]] for all the available aggregate functions.
- *
- * TODO: This class bypasses the original GroupData
- * so as to support user-defined aggregations.
- * Need a more smart injection into existing DataFrame APIs.
- *
- * A list of added Hivemall UDAF:
- * - voted_avg
- * - weight_voted_avg
- * - argmin_kld
- * - max_label
- * - maxrow
- * - f1score
- * - mae
- * - mse
- * - rmse
- *
- * @groupname ensemble
- */
- @scala.annotation.varargs
- def groupby(cols: Column*): GroupedDataEx = {
- new GroupedDataEx(df, cols.map(_.expr), GroupedData.GroupByType)
- }
-
- @scala.annotation.varargs
- def groupby(col1: String, cols: String*): GroupedDataEx = {
- val colNames: Seq[String] = col1 +: cols
- new GroupedDataEx(df, colNames.map(colName => df(colName).expr), GroupedData.GroupByType)
- }
-
- /**
- * @see hivemall.knn.lsh.MinHashUDTF
- * @group knn.lsh
- */
- @scala.annotation.varargs
- def minhash(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.knn.lsh.MinHashUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- Seq("clusterid", "item").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.ftvec.amplify.AmplifierUDTF
- * @group ftvec.amplify
- */
- @scala.annotation.varargs
- def amplify(exprs: Column*): DataFrame = {
- val outputAttr = exprs.drop(1).map {
- case Column(expr: NamedExpression) => UnresolvedAttribute(expr.name)
- case Column(expr: Expression) => UnresolvedAttribute(expr.prettyString)
- }
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.ftvec.amplify.AmplifierUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- outputAttr,
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.ftvec.amplify.RandomAmplifierUDTF
- * @group ftvec.amplify
- */
- @scala.annotation.varargs
- def rand_amplify(exprs: Column*): DataFrame = {
- val outputAttr = exprs.drop(2).map {
- case Column(expr: NamedExpression) => UnresolvedAttribute(expr.name)
- case Column(expr: Expression) => UnresolvedAttribute(expr.prettyString)
- }
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.ftvec.amplify.RandomAmplifierUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- outputAttr,
- df.logicalPlan)
- }
-
- /**
- * Amplifies and shuffle data inside partitions.
- * @group ftvec.amplify
- */
- def part_amplify(xtimes: Int): DataFrame = {
- val rdd = df.rdd.mapPartitions({ iter =>
- val elems = iter.flatMap{ row =>
- Seq.fill[Row](xtimes)(row)
- }
- // Need to check how this shuffling affects results
- scala.util.Random.shuffle(elems)
- }, true)
- df.sqlContext.createDataFrame(rdd, df.schema)
- }
-
- /**
- * Quantifies input columns.
- * @see hivemall.ftvec.conv.QuantifyColumnsUDTF
- * @group ftvec.conv
- */
- @scala.annotation.varargs
- def quantify(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.ftvec.conv.QuantifyColumnsUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.ftvec.trans.BinarizeLabelUDTF
- * @group ftvec.trans
- */
- @scala.annotation.varargs
- def binarize_label(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.ftvec.trans.BinarizeLabelUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * @see hivemall.ftvec.trans.QuantifiedFeaturesUDTF
- * @group ftvec.trans
- */
- @scala.annotation.varargs
- def quantified_features(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.ftvec.trans.QuantifiedFeaturesUDTF"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- Seq("features").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * Splits Seq[String] into pieces.
- * @group ftvec
- */
- def explode_array(expr: Column): DataFrame = {
- df.explode(expr) { case Row(v: Seq[_]) =>
- // Type erasure removes the component type in Seq
- v.map(s => HivemallFeature(s.asInstanceOf[String]))
- }
- }
-
- def explode_array(expr: String): DataFrame =
- this.explode_array(df(expr))
-
- /**
- * Returns a top-`k` records for each `group`.
- * @group misc
- */
- def each_top_k(k: Column, group: Column, value: Column, args: Column*): DataFrame = {
- val clusterDf = df.repartition(group).sortWithinPartitions(group)
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.tools.EachTopKUDTF"),
- (Seq(k, group, value) ++ args).map(_.expr)),
- join = false, outer = false, None,
- (Seq("rank", "key") ++ args.map(_.named.name)).map(UnresolvedAttribute(_)),
- clusterDf.logicalPlan)
- }
-
- /**
- * Returns a new [[DataFrame]] with columns renamed.
- * This is a wrapper for DataFrame#toDF.
- * @group misc
- */
- @scala.annotation.varargs
- def as(colNames: String*): DataFrame = df.toDF(colNames: _*)
-
- /**
- * Returns all the columns as Seq[Column] in this [[DataFrame]].
- * @group misc
- */
- def cols: Seq[Column] = {
- df.schema.fields.map(col => df.col(col.name)).toSeq
- }
-
- /**
- * @see hivemall.dataset.LogisticRegressionDataGeneratorUDTF
- * @group misc
- */
- @scala.annotation.varargs
- def lr_datagen(exprs: Column*): DataFrame = {
- Generate(HiveGenericUDTF(
- new HiveFunctionWrapper("hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper"),
- exprs.map(_.expr)),
- join = false, outer = false, None,
- Seq("label", "features").map(UnresolvedAttribute(_)),
- df.logicalPlan)
- }
-
- /**
- * :: Experimental ::
- * If a parameter '-mix' does not exist in a 3rd argument,
- * set it from an environmental variable
- * 'HIVEMALL_MIX_SERVERS'.
- *
- * TODO: This could work if '--deploy-mode' has 'client';
- * otherwise, we need to set HIVEMALL_MIX_SERVERS
- * in all possible spark workers.
- */
- private[this] def setMixServs(exprs: Column*): Seq[Column] = {
- val mixes = System.getenv("HIVEMALL_MIX_SERVERS")
- if (mixes != null && !mixes.isEmpty()) {
- val groupId = df.sqlContext.sparkContext.applicationId + "-" + UUID.randomUUID
- logInfo(s"set '${mixes}' as default mix servers (session: ${groupId})")
- exprs.size match {
- case 2 =>
- exprs :+ Column(Literal.create(s"-mix ${mixes} -mix_session ${groupId}", StringType))
- /** TODO: Add codes in the case where exprs.size == 3. */
- case _ =>
- exprs
- }
- } else {
- exprs
- }
- }
-}
-
-object HivemallOps {
-
- /**
- * Implicitly inject the [[HivemallOps]] into [[DataFrame]].
- */
- implicit def dataFrameToHivemallOps(df: DataFrame): HivemallOps =
- new HivemallOps(df)
-
- /**
- * An implicit conversion to avoid doing annoying transformation.
- */
- @inline private implicit def toColumn(expr: Expression) = Column(expr)
-
- /**
- * @see hivemall.HivemallVersionUDF
- * @group misc
- */
- def hivemall_version(): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.HivemallVersionUDF"), Nil)
- }
-
- /**
- * @see hivemall.knn.similarity.CosineSimilarityUDF
- * @group knn.similarity
- */
- @scala.annotation.varargs
- def cosine_sim(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.similarity.CosineSimilarityUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.similarity.JaccardIndexUDF
- * @group knn.similarity
- */
- @scala.annotation.varargs
- def jaccard(exprs: Column*): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.knn.similarity.JaccardIndexUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.similarity.AngularSimilarityUDF
- * @group knn.similarity
- */
- @scala.annotation.varargs
- def angular_similarity(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.similarity.AngularSimilarityUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.similarity.EuclidSimilarity
- * @group knn.similarity
- */
- @scala.annotation.varargs
- def euclid_similarity(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.similarity.EuclidSimilarity"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.similarity.Distance2SimilarityUDF
- * @group knn.similarity
- */
- @scala.annotation.varargs
- def distance2similarity(exprs: Column*): Column = {
- // TODO: Need a wrapper class because of using unsupported types
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.similarity.Distance2SimilarityUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.distance.HammingDistanceUDF
- * @group knn.distance
- */
- @scala.annotation.varargs
- def hamming_distance(exprs: Column*): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.knn.distance.HammingDistanceUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.distance.PopcountUDF
- * @group knn.distance
- */
- @scala.annotation.varargs
- def popcnt(exprs: Column*): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.knn.distance.PopcountUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.distance.KLDivergenceUDF
- * @group knn.distance
- */
- @scala.annotation.varargs
- def kld(exprs: Column*): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.knn.distance.KLDivergenceUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.distance.EuclidDistanceUDF
- * @group knn.distance
- */
- @scala.annotation.varargs
- def euclid_distance(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.distance.EuclidDistanceUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.distance.CosineDistanceUDF
- * @group knn.distance
- */
- @scala.annotation.varargs
- def cosine_distance(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.distance.CosineDistanceUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.distance.AngularDistanceUDF
- * @group knn.distance
- */
- @scala.annotation.varargs
- def angular_distance(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.distance.AngularDistanceUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.distance.ManhattanDistanceUDF
- * @group knn.distance
- */
- @scala.annotation.varargs
- def manhattan_distance(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.distance.ManhattanDistanceUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.distance.MinkowskiDistanceUDF
- * @group knn.distance
- */
- @scala.annotation.varargs
- def minkowski_distance (exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.distance.MinkowskiDistanceUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.lsh.bBitMinHashUDF
- * @group knn.lsh
- */
- @scala.annotation.varargs
- def bbit_minhash(exprs: Column*): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.knn.lsh.bBitMinHashUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.knn.lsh.MinHashesUDF
- * @group knn.lsh
- */
- @scala.annotation.varargs
- def minhashes(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.knn.lsh.MinHashesUDFWrapper"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.AddBiasUDF
- * @group ftvec
- */
- @scala.annotation.varargs
- def add_bias(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.AddBiasUDFWrapper"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.ExtractFeatureUdf
- * @group ftvec
- *
- * TODO: This throws java.lang.ClassCastException because
- * HiveInspectors.toInspector has a bug in spark.
- * Need to fix it later.
- */
- def extract_feature(expr: Column): Column = {
- val hiveUdf = HiveGenericUDF(
- new HiveFunctionWrapper("hivemall.ftvec.ExtractFeatureUDFWrapper"),
- expr.expr :: Nil)
- Column(hiveUdf).as("feature")
- }
-
- /**
- * @see hivemall.ftvec.ExtractWeightUdf
- * @group ftvec
- *
- * TODO: This throws java.lang.ClassCastException because
- * HiveInspectors.toInspector has a bug in spark.
- * Need to fix it later.
- */
- def extract_weight(expr: Column): Column = {
- val hiveUdf = HiveGenericUDF(
- new HiveFunctionWrapper("hivemall.ftvec.ExtractWeightUDFWrapper"),
- expr.expr :: Nil)
- Column(hiveUdf).as("value")
- }
-
- /**
- * @see hivemall.ftvec.AddFeatureIndexUDFWrapper
- * @group ftvec
- */
- def add_feature_index(expr: Column): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.AddFeatureIndexUDFWrapper"), expr.expr :: Nil)
- }
-
- /**
- * @see hivemall.ftvec.SortByFeatureUDF
- * @group ftvec
- */
- def sort_by_feature(expr: Column): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.SortByFeatureUDFWrapper"), expr.expr :: Nil)
- }
-
- /**
- * @see hivemall.ftvec.hashing.MurmurHash3UDF
- * @group ftvec.hashing
- */
- def mhash(expr: Column): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.hashing.MurmurHash3UDF"), expr.expr :: Nil)
- }
-
- /**
- * @see hivemall.ftvec.hashing.Sha1UDF
- * @group ftvec.hashing
- */
- def sha1(expr: Column): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.hashing.Sha1UDF"), expr.expr :: Nil)
- }
-
- /**
- * @see hivemall.ftvec.hashing.ArrayHashValuesUDF
- * @group ftvec.hashing
- */
- @scala.annotation.varargs
- def array_hash_values(exprs: Column*): Column = {
- // TODO: Need a wrapper class because of using unsupported types
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.hashing.ArrayHashValuesUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF
- * @group ftvec.hashing
- */
- @scala.annotation.varargs
- def prefixed_hash_values(exprs: Column*): Column = {
- // TODO: Need a wrapper class because of using unsupported types
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.scaling.RescaleUDF
- * @group ftvec.scaling
- */
- @scala.annotation.varargs
- def rescale(exprs: Column*): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.scaling.RescaleUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.scaling.ZScoreUDF
- * @group ftvec.scaling
- */
- @scala.annotation.varargs
- def zscore(exprs: Column*): Column = {
- HiveSimpleUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.scaling.ZScoreUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.scaling.L2NormalizationUDF
- * @group ftvec.scaling
- */
- def normalize(expr: Column): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.scaling.L2NormalizationUDFWrapper"), expr.expr :: Nil)
- }
-
- /**
- * @see hivemall.ftvec.selection.ChiSquareUDF
- * @group ftvec.selection
- */
- def chi2(observed: Column, expected: Column): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.selection.ChiSquareUDF"), Seq(observed.expr, expected.expr))
- }
-
- /**
- * @see hivemall.ftvec.conv.ToDenseFeaturesUDF
- * @group ftvec.conv
- */
- @scala.annotation.varargs
- def to_dense_features(exprs: Column*): Column = {
- // TODO: Need a wrapper class because of using unsupported types
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.conv.ToDenseFeaturesUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.conv.ToSparseFeaturesUDF
- * @group ftvec.conv
- */
- @scala.annotation.varargs
- def to_sparse_features(exprs: Column*): Column = {
- // TODO: Need a wrapper class because of using unsupported types
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.conv.ToSparseFeaturesUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.trans.VectorizeFeaturesUDF
- * @group ftvec.trans
- */
- @scala.annotation.varargs
- def vectorize_features(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.trans.VectorizeFeaturesUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.trans.CategoricalFeaturesUDF
- * @group ftvec.trans
- */
- @scala.annotation.varargs
- def categorical_features(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.trans.CategoricalFeaturesUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.trans.IndexedFeatures
- * @group ftvec.trans
- */
- @scala.annotation.varargs
- def indexed_features(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.trans.IndexedFeatures"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.ftvec.trans.QuantitativeFeaturesUDF
- * @group ftvec.trans
- */
- @scala.annotation.varargs
- def quantitative_features(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.ftvec.trans.QuantitativeFeaturesUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.smile.tools.TreePredictUDF
- * @group misc
- */
- @scala.annotation.varargs
- def tree_predict(exprs: Column*): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.smile.tools.TreePredictUDF"), exprs.map(_.expr))
- }
-
- /**
- * @see hivemall.tools.array.SelectKBestUDF
- * @group tools.array
- */
- def select_k_best(X: Column, importanceList: Column, k: Column): Column = {
- HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.tools.array.SelectKBestUDF"), Seq(X.expr, importanceList.expr, k.expr))
- }
-
- /**
- * @see hivemall.tools.math.SigmoidUDF
- * @group misc
- */
- @scala.annotation.varargs
- def sigmoid(exprs: Column*): Column = {
- /**
- * TODO: SigmodUDF only accepts floating-point types in spark-v1.5.0?
- */
- val value = exprs.head
- val one: () => Literal = () => Literal.create(1.0, DoubleType)
- Column(one()) / (Column(one()) + exp(-value))
- }
-
- /**
- * @see hivemall.tools.mapred.RowIdUDF
- * @group misc
- */
- def rowid(): Column = {
- val hiveUdf = HiveGenericUDF(new HiveFunctionWrapper(
- "hivemall.tools.mapred.RowIdUDFWrapper"), Nil)
- hiveUdf.as("rowid")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
deleted file mode 100644
index dff62b3..0000000
--- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
-import org.apache.spark.sql.{Column, DataFrame, Row, UserDefinedFunction}
-import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types._
-
-object HivemallUtils {
-
- // # of maximum dimensions for feature vectors
- val maxDims = 100000000
-
- /**
- * An implicit conversion to avoid doing annoying transformation.
- * This class must be in o.a.spark.sql._ because
- * a Column class is private.
- */
- @inline implicit def toBooleanLiteral(i: Boolean): Column = Column(Literal.create(i, BooleanType))
- @inline implicit def toIntLiteral(i: Int): Column = Column(Literal.create(i, IntegerType))
- @inline implicit def toFloatLiteral(i: Float): Column = Column(Literal.create(i, FloatType))
- @inline implicit def toDoubleLiteral(i: Double): Column = Column(Literal.create(i, DoubleType))
- @inline implicit def toStringLiteral(i: String): Column = Column(Literal.create(i, StringType))
- @inline implicit def toIntArrayLiteral(i: Seq[Int]): Column =
- Column(Literal.create(i, ArrayType(IntegerType)))
- @inline implicit def toStringArrayLiteral(i: Seq[String]): Column =
- Column(Literal.create(i, ArrayType(StringType)))
-
- /**
- * Check whether the given schema contains a column of the required data type.
- * @param colName column name
- * @param dataType required column data type
- */
- def checkColumnType(schema: StructType, colName: String, dataType: DataType): Unit = {
- val actualDataType = schema(colName).dataType
- require(actualDataType.equals(dataType),
- s"Column $colName must be of type $dataType but was actually $actualDataType.")
- }
-
- /**
- * Make up a function object from a Hivemall model.
- */
- def funcModel(df: DataFrame, dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = {
- checkColumnType(df.schema, "feature", StringType)
- checkColumnType(df.schema, "weight", DoubleType)
-
- import df.sqlContext.implicits._
- val intercept = df
- .where($"feature" === "0")
- .select($"weight")
- .map { case Row(weight: Double) => weight}
- .reduce(_ + _)
- val weights = funcVectorizerImpl(dense, dims)(
- df.select($"feature", $"weight")
- .where($"feature" !== "0")
- .map { case Row(label: String, feature: Double) => s"${label}:$feature"}
- .collect.toSeq)
-
- udf((input: Vector) => BLAS.dot(input, weights) + intercept)
- }
-
- /**
- * Make up a function object to transform Hivemall features into Vector.
- */
- def funcVectorizer(dense: Boolean = false, dims: Int = maxDims)
- : UserDefinedFunction = {
- udf(funcVectorizerImpl(dense, dims))
- }
-
- private def funcVectorizerImpl(dense: Boolean, dims: Int)
- : Seq[String] => Vector = {
- if (dense) {
- // Dense features
- i: Seq[String] => {
- val features = new Array[Double](dims)
- i.map { ft =>
- val s = ft.split(":").ensuring(_.size == 2)
- features(s(0).toInt) = s(1).toDouble
- }
- Vectors.dense(features)
- }
- } else {
- // Sparse features
- i: Seq[String] => {
- val features = i.map { ft =>
- // val s = ft.split(":").ensuring(_.size == 2)
- val s = ft.split(":")
- (s(0).toInt, s(1).toDouble)
- }
- Vectors.sparse(dims, features)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/resources/log4j.properties b/spark/spark-1.6/src/test/resources/log4j.properties
deleted file mode 100644
index 1db11f0..0000000
--- a/spark/spark-1.6/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-# Set everything to be logged to the console
-log4j.rootCategory=FATAL, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
-
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/hivemall/mix/server/MixServerSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/hivemall/mix/server/MixServerSuite.scala b/spark/spark-1.6/src/test/scala/hivemall/mix/server/MixServerSuite.scala
deleted file mode 100644
index dbb818b..0000000
--- a/spark/spark-1.6/src/test/scala/hivemall/mix/server/MixServerSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package hivemall.mix.server
-
-import java.util.Random
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
-import java.util.logging.Logger
-
-import hivemall.mix.MixMessage.MixEventName
-import hivemall.mix.client.MixClient
-import hivemall.mix.server.MixServer.ServerState
-import hivemall.model.{DenseModel, PredictionModel, WeightValue}
-import hivemall.utils.io.IOUtils
-import hivemall.utils.lang.CommandLineUtils
-import hivemall.utils.net.NetUtils
-import org.scalatest.{BeforeAndAfter, FunSuite}
-
-class MixServerSuite extends FunSuite with BeforeAndAfter {
-
- private[this] var server: MixServer = _
- private[this] var executor : ExecutorService = _
- private[this] var port: Int = _
-
- private[this] val rand = new Random(43)
- private[this] val counter = Stream.from(0).iterator
-
- private[this] val eachTestTime = 100
- private[this] val logger =
- Logger.getLogger(classOf[MixServerSuite].getName)
-
- before {
- this.port = NetUtils.getAvailablePort
- this.server = new MixServer(
- CommandLineUtils.parseOptions(
- Array("-port", s"${port}", "-sync_threshold", "3"),
- MixServer.getOptions()
- )
- )
- this.executor = Executors.newSingleThreadExecutor
- this.executor.submit(server)
- var retry = 0
- while (server.getState() != ServerState.RUNNING && retry < 50) {
- Thread.sleep(1000L)
- retry += 1
- }
- assert(server.getState == ServerState.RUNNING)
- }
-
- after { this.executor.shutdown() }
-
- private[this] def clientDriver(
- groupId: String, model: PredictionModel, numMsg: Int = 1000000): Unit = {
- var client: MixClient = null
- try {
- client = new MixClient(MixEventName.average, groupId, s"localhost:${port}", false, 2, model)
- model.configureMix(client, false)
- model.configureClock()
-
- for (_ <- 0 until numMsg) {
- val feature = Integer.valueOf(rand.nextInt(model.size))
- model.set(feature, new WeightValue(1.0f))
- }
-
- while (true) { Thread.sleep(eachTestTime * 1000 + 100L) }
- assert(model.getNumMixed > 0)
- } finally {
- IOUtils.closeQuietly(client)
- }
- }
-
- private[this] def fixedGroup: (String, () => String) =
- ("fixed", () => "fixed")
- private[this] def uniqueGroup: (String, () => String) =
- ("unique", () => s"${counter.next}")
-
- Seq(65536).map { ndims =>
- Seq(4).map { nclient =>
- Seq(fixedGroup, uniqueGroup).map { id =>
- val testName = s"dense-dim:${ndims}-clinet:${nclient}-${id._1}"
- ignore(testName) {
- val clients = Executors.newCachedThreadPool()
- val numClients = nclient
- val models = (0 until numClients).map(i => new DenseModel(ndims, false))
- (0 until numClients).map { i =>
- clients.submit(new Runnable() {
- override def run(): Unit = {
- try {
- clientDriver(
- s"${testName}-${id._2}",
- models(i)
- )
- } catch {
- case e: InterruptedException =>
- assert(false, e.getMessage)
- }
- }
- })
- }
- clients.awaitTermination(eachTestTime, TimeUnit.SECONDS)
- clients.shutdown()
- val nMixes = models.map(d => d.getNumMixed).reduce(_ + _)
- logger.info(s"${testName} --> ${(nMixes + 0.0) / eachTestTime} mixes/s")
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala b/spark/spark-1.6/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala
deleted file mode 100644
index f203fc2..0000000
--- a/spark/spark-1.6/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package hivemall.tools
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.sql.hive.test.TestHive
-
-class RegressionDatagenSuite extends FunSuite {
-
- test("datagen") {
- val df = RegressionDatagen.exec(
- TestHive, min_examples = 10000, n_features = 100, n_dims = 65536, dense = false, cl = true)
- assert(df.count() >= 10000)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/SparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/SparkFunSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/SparkFunSuite.scala
deleted file mode 100644
index 991e46f..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark
-
-// scalastyle:off
-import org.scalatest.{FunSuite, Outcome}
-
-/**
- * Base abstract class for all unit tests in Spark for handling common functionality.
- */
-private[spark] abstract class SparkFunSuite extends FunSuite with Logging {
-// scalastyle:on
-
- /**
- * Log the suite name and the test name before and after each test.
- *
- * Subclasses should never override this method. If they wish to run
- * custom code before and after each test, they should mix in the
- * {{org.scalatest.BeforeAndAfter}} trait instead.
- */
- final protected override def withFixture(test: NoArgTest): Outcome = {
- val testName = test.text
- val suiteName = this.getClass.getName
- val shortSuiteName = suiteName.replaceAll("org.apache.spark", "o.a.s")
- try {
- logInfo(s"\n\n===== TEST OUTPUT FOR $shortSuiteName: '$testName' =====\n")
- test()
- } finally {
- logInfo(s"\n\n===== FINISHED $shortSuiteName: '$testName' =====\n")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala
deleted file mode 100644
index f57983f..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.ml.feature
-
-import org.apache.spark.SparkFunSuite
-
-class HivemallLabeledPointSuite extends SparkFunSuite {
-
- test("toString") {
- val lp = HivemallLabeledPoint(1.0f, Seq("1:0.5", "3:0.3", "8:0.1"))
- assert(lp.toString === "1.0,[1:0.5,3:0.3,8:0.1]")
- }
-
- test("parse") {
- val lp = HivemallLabeledPoint.parse("1.0,[1:0.5,3:0.3,8:0.1]")
- assert(lp.label === 1.0)
- assert(lp.features === Seq("1:0.5", "3:0.3", "8:0.1"))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/QueryTest.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/QueryTest.scala
deleted file mode 100644
index ef520ae..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql
-
-import java.util.{Locale, TimeZone}
-
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.TreeNode
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.{LogicalRDD, Queryable}
-import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
-abstract class QueryTest extends PlanTest {
-
- protected def sqlContext: SQLContext
-
- // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*)
- TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
- // Add Locale setting
- Locale.setDefault(Locale.US)
-
- /**
- * Runs the plan and makes sure the answer contains all of the keywords, or the
- * none of keywords are listed in the answer
- * @param df the [[DataFrame]] to be executed
- * @param exists true for make sure the keywords are listed in the output, otherwise
- * to make sure none of the keyword are not listed in the output
- * @param keywords keyword in string array
- */
- def checkExistence(df: DataFrame, exists: Boolean, keywords: String*) {
- val outputs = df.collect().map(_.mkString).mkString
- for (key <- keywords) {
- if (exists) {
- assert(outputs.contains(key), s"Failed for $df ($key doesn't exist in result)")
- } else {
- assert(!outputs.contains(key), s"Failed for $df ($key existed in the result)")
- }
- }
- }
-
- /**
- * Evaluates a dataset to make sure that the result of calling collect matches the given
- * expected answer.
- * - Special handling is done based on whether the query plan should be expected to return
- * the results in sorted order.
- * - This function also checks to make sure that the schema for serializing the expected answer
- * matches that produced by the dataset (i.e. does manual construction of object match
- * the constructed encoder for cases like joins, etc). Note that this means that it will fail
- * for cases where reordering is done on fields. For such tests, user `checkDecoding` instead
- * which performs a subset of the checks done by this function.
- */
- protected def checkAnswer[T](
- ds: Dataset[T],
- expectedAnswer: T*): Unit = {
- checkAnswer(
- ds.toDF(),
- sqlContext.createDataset(expectedAnswer)(ds.unresolvedTEncoder).toDF().collect().toSeq)
-
- checkDecoding(ds, expectedAnswer: _*)
- }
-
- protected def checkDecoding[T](
- ds: => Dataset[T],
- expectedAnswer: T*): Unit = {
- val decoded = try ds.collect().toSet catch {
- case e: Exception =>
- fail(
- s"""
- |Exception collecting dataset as objects
- |${ds.resolvedTEncoder}
- |${ds.resolvedTEncoder.fromRowExpression.treeString}
- |${ds.queryExecution}
- """.stripMargin, e)
- }
-
- if (decoded != expectedAnswer.toSet) {
- val expected = expectedAnswer.toSet.toSeq.map((a: Any) => a.toString).sorted
- val actual = decoded.toSet.toSeq.map((a: Any) => a.toString).sorted
-
- val comparision = sideBySide("expected" +: expected, "spark" +: actual).mkString("\n")
- fail(
- s"""Decoded objects do not match expected objects:
- |$comparision
- |${ds.resolvedTEncoder.fromRowExpression.treeString}
- """.stripMargin)
- }
- }
-
- /**
- * Runs the plan and makes sure the answer matches the expected result.
- * @param df the [[DataFrame]] to be executed
- * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
- */
- protected def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
- val analyzedDF = try df catch {
- case ae: AnalysisException =>
- val currentValue = sqlContext.conf.dataFrameEagerAnalysis
- sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
- val partiallyAnalzyedPlan = df.queryExecution.analyzed
- sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, currentValue)
- fail(
- s"""
- |Failed to analyze query: $ae
- |$partiallyAnalzyedPlan
- |
- |${stackTraceToString(ae)}
- |""".stripMargin)
- }
-
- assertEmptyMissingInput(df)
-
- QueryTest.checkAnswer(analyzedDF, expectedAnswer) match {
- case Some(errorMessage) => fail(errorMessage)
- case None =>
- }
- }
-
- protected def checkAnswer(df: => DataFrame, expectedAnswer: Row): Unit = {
- checkAnswer(df, Seq(expectedAnswer))
- }
-
- protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = {
- checkAnswer(df, expectedAnswer.collect())
- }
-
- /**
- * Runs the plan and makes sure the answer is within absTol of the expected result.
- * @param dataFrame the [[DataFrame]] to be executed
- * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
- * @param absTol the absolute tolerance between actual and expected answers.
- */
- protected def checkAggregatesWithTol(dataFrame: DataFrame,
- expectedAnswer: Seq[Row],
- absTol: Double): Unit = {
- // TODO: catch exceptions in data frame execution
- val actualAnswer = dataFrame.collect()
- require(actualAnswer.length == expectedAnswer.length,
- s"actual num rows ${actualAnswer.length} != expected num of rows ${expectedAnswer.length}")
-
- actualAnswer.zip(expectedAnswer).foreach {
- case (actualRow, expectedRow) =>
- QueryTest.checkAggregatesWithTol(actualRow, expectedRow, absTol)
- }
- }
-
- protected def checkAggregatesWithTol(dataFrame: DataFrame,
- expectedAnswer: Row,
- absTol: Double): Unit = {
- checkAggregatesWithTol(dataFrame, Seq(expectedAnswer), absTol)
- }
-
- /**
- * Asserts that a given [[Queryable]] will be executed using the given number of cached results.
- */
- def assertCached(query: Queryable, numCachedTables: Int = 1): Unit = {
- val planWithCaching = query.queryExecution.withCachedData
- val cachedData = planWithCaching collect {
- case cached: InMemoryRelation => cached
- }
-
- assert(
- cachedData.size == numCachedTables,
- s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
- planWithCaching)
- }
-
- /**
- * Asserts that a given [[Queryable]] does not have missing inputs in all the analyzed plans.
- */
- def assertEmptyMissingInput(query: Queryable): Unit = {
- assert(query.queryExecution.analyzed.missingInput.isEmpty,
- s"The analyzed logical plan has missing inputs: ${query.queryExecution.analyzed}")
- assert(query.queryExecution.optimizedPlan.missingInput.isEmpty,
- s"The optimized logical plan has missing inputs: ${query.queryExecution.optimizedPlan}")
- assert(query.queryExecution.executedPlan.missingInput.isEmpty,
- s"The physical plan has missing inputs: ${query.queryExecution.executedPlan}")
- }
-}
-
-object QueryTest {
- /**
- * Runs the plan and makes sure the answer matches the expected result.
- * If there was exception during the execution or the contents of the DataFrame does not
- * match the expected result, an error message will be returned. Otherwise, a [[None]] will
- * be returned.
- * @param df the [[DataFrame]] to be executed
- * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
- */
- def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
- val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
-
- // We need to call prepareRow recursively to handle schemas with struct types.
- def prepareRow(row: Row): Row = {
- Row.fromSeq(row.toSeq.map {
- case null => null
- case d: java.math.BigDecimal => BigDecimal(d)
- // Convert array to Seq for easy equality check.
- case b: Array[_] => b.toSeq
- case r: Row => prepareRow(r)
- case o => o
- })
- }
-
- def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
- // Converts data to types that we can do equality comparison using Scala collections.
- // For BigDecimal type, the Scala type has a better definition of equality test (similar to
- // Java's java.math.BigDecimal.compareTo).
- // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for
- // equality test.
- val converted: Seq[Row] = answer.map(prepareRow)
- if (!isSorted) converted.sortBy(_.toString()) else converted
- }
- val sparkAnswer = try df.collect().toSeq catch {
- case e: Exception =>
- val errorMessage =
- s"""
- |Exception thrown while executing query:
- |${df.queryExecution}
- |== Exception ==
- |$e
- |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
- """.stripMargin
- return Some(errorMessage)
- }
-
- if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
- val errorMessage =
- s"""
- |Results do not match for query:
- |${df.queryExecution}
- |== Results ==
- |${sideBySide(
- s"== Correct Answer - ${expectedAnswer.size} ==" +:
- prepareAnswer(expectedAnswer).map(_.toString()),
- s"== Spark Answer - ${sparkAnswer.size} ==" +:
- prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")}
- """.stripMargin
- return Some(errorMessage)
- }
-
- return None
- }
-
- /**
- * Runs the plan and makes sure the answer is within absTol of the expected result.
- * @param actualAnswer the actual result in a [[Row]].
- * @param expectedAnswer the expected result in a[[Row]].
- * @param absTol the absolute tolerance between actual and expected answers.
- */
- protected def checkAggregatesWithTol(actualAnswer: Row, expectedAnswer: Row, absTol: Double) = {
- require(actualAnswer.length == expectedAnswer.length,
- s"actual answer length ${actualAnswer.length} != " +
- s"expected answer length ${expectedAnswer.length}")
-
- // TODO: support other numeric types besides Double
- // TODO: support struct types?
- actualAnswer.toSeq.zip(expectedAnswer.toSeq).foreach {
- case (actual: Double, expected: Double) =>
- assert(math.abs(actual - expected) < absTol,
- s"actual answer $actual not within $absTol of correct answer $expected")
- case (actual, expected) =>
- assert(actual == expected, s"$actual did not equal $expected")
- }
- }
-
- def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): String = {
- checkAnswer(df, expectedAnswer.asScala) match {
- case Some(errorMessage) => errorMessage
- case None => null
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
deleted file mode 100644
index 816576e..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.catalyst.plans
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.util._
-
-/**
- * Provides helper methods for comparing plans.
- */
-class PlanTest extends SparkFunSuite {
-
- /**
- * Since attribute references are given globally unique ids during analysis,
- * we must normalize them to check if two different queries are identical.
- */
- protected def normalizeExprIds(plan: LogicalPlan) = {
- plan transformAllExpressions {
- case a: AttributeReference =>
- AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
- case a: Alias =>
- Alias(a.child, a.name)(exprId = ExprId(0))
- }
- }
-
- /** Fails the test if the two plans do not match */
- protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
- val normalized1 = normalizeExprIds(plan1)
- val normalized2 = normalizeExprIds(plan2)
- if (normalized1 != normalized2) {
- fail(
- s"""
- |== FAIL: Plans do not match ===
- |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
- """.stripMargin)
- }
- }
-
- /** Fails the test if the two expressions do not match */
- protected def compareExpressions(e1: Expression, e2: Expression): Unit = {
- comparePlans(Filter(e1, OneRowRelation), Filter(e2, OneRowRelation))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
deleted file mode 100644
index ded94ba..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.Row
-import org.apache.spark.test.HivemallQueryTest
-
-final class HiveUdfSuite extends HivemallQueryTest {
-
- import hiveContext.implicits._
- import hiveContext._
-
- test("hivemall_version") {
- sql(s"""
- | CREATE TEMPORARY FUNCTION hivemall_version
- | AS '${classOf[hivemall.HivemallVersionUDF].getName}'
- """.stripMargin)
-
- checkAnswer(
- sql(s"SELECT DISTINCT hivemall_version()"),
- Row("0.4.2-rc.2")
- )
-
- // sql("DROP TEMPORARY FUNCTION IF EXISTS hivemall_version")
- // reset()
- }
-
- test("train_logregr") {
- TinyTrainData.registerTempTable("TinyTrainData")
- sql(s"""
- | CREATE TEMPORARY FUNCTION train_logregr
- | AS '${classOf[hivemall.regression.LogressUDTF].getName}'
- """.stripMargin)
- sql(s"""
- | CREATE TEMPORARY FUNCTION add_bias
- | AS '${classOf[hivemall.ftvec.AddBiasUDFWrapper].getName}'
- """.stripMargin)
-
- val model = sql(
- s"""
- | SELECT feature, AVG(weight) AS weight
- | FROM (
- | SELECT train_logregr(add_bias(features), label) AS (feature, weight)
- | FROM TinyTrainData
- | ) t
- | GROUP BY feature
- """.stripMargin)
-
- checkAnswer(
- model.select($"feature"),
- Seq(Row("0"), Row("1"), Row("2"))
- )
-
- // TODO: Why 'train_logregr' is not registered in HiveMetaStore?
- // ERROR RetryingHMSHandler: MetaException(message:NoSuchObjectException
- // (message:Function default.train_logregr does not exist))
- //
- // hiveContext.sql("DROP TEMPORARY FUNCTION IF EXISTS train_logregr")
- // hiveContext.reset()
- }
-
- test("each_top_k") {
- val testDf = Seq(
- ("a", "1", 0.5, Array(0, 1, 2)),
- ("b", "5", 0.1, Array(3)),
- ("a", "3", 0.8, Array(2, 5)),
- ("c", "6", 0.3, Array(1, 3)),
- ("b", "4", 0.3, Array(2)),
- ("a", "2", 0.6, Array(1))
- ).toDF("key", "value", "score", "data")
-
- import testDf.sqlContext.implicits._
- testDf.repartition($"key").sortWithinPartitions($"key").registerTempTable("TestData")
- sql(s"""
- | CREATE TEMPORARY FUNCTION each_top_k
- | AS '${classOf[hivemall.tools.EachTopKUDTF].getName}'
- """.stripMargin)
-
- // Compute top-1 rows for each group
- assert(
- sql("SELECT each_top_k(1, key, score, key, value) FROM TestData").collect.toSet ===
- Set(
- Row(1, 0.8, "a", "3"),
- Row(1, 0.3, "b", "4"),
- Row(1, 0.3, "c", "6")
- ))
-
- // Compute reverse top-1 rows for each group
- assert(
- sql("SELECT each_top_k(-1, key, score, key, value) FROM TestData").collect.toSet ===
- Set(
- Row(1, 0.5, "a", "1"),
- Row(1, 0.1, "b", "5"),
- Row(1, 0.3, "c", "6")
- ))
- }
-}