You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by my...@apache.org on 2017/04/09 21:43:39 UTC

[1/3] incubator-hivemall git commit: Close #67: [HIVEMALL-55][SPARK] Drop the Spark-v1.6 support

Repository: incubator-hivemall
Updated Branches:
  refs/heads/master 8dc3a024d -> c53b9ff9b


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
deleted file mode 100644
index df82547..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
+++ /dev/null
@@ -1,665 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.{Column, Row}
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
-import org.apache.spark.sql.types._
-import org.apache.spark.test.HivemallQueryTest
-import org.apache.spark.test.TestDoubleWrapper._
-import org.apache.spark.test.TestUtils._
-
-final class HivemallOpsSuite extends HivemallQueryTest {
-
-  test("knn.similarity") {
-    val row1 = DummyInputData.select(cosine_sim(Seq(1, 2, 3, 4), Seq(3, 4, 5, 6))).collect
-    assert(row1(0).getFloat(0) ~== 0.500f)
-
-    val row2 = DummyInputData.select(jaccard(5, 6)).collect
-    assert(row2(0).getFloat(0) ~== 0.96875f)
-
-    val row3 = DummyInputData.select(angular_similarity(Seq(1, 2, 3), Seq(4, 5, 6))).collect
-    assert(row3(0).getFloat(0) ~== 0.500f)
-
-    val row4 = DummyInputData.select(euclid_similarity(Seq(5, 3, 1), Seq(2, 8, 3))).collect
-    assert(row4(0).getFloat(0) ~== 0.33333334f)
-
-    // TODO: $"c0" throws AnalysisException, why?
-    // val row5 = DummyInputData.select(distance2similarity(DummyInputData("c0"))).collect
-    // assert(row5(0).getFloat(0) ~== 0.1f)
-  }
-
-  test("knn.distance") {
-    val row1 = DummyInputData.select(hamming_distance(1, 3)).collect
-    assert(row1(0).getInt(0) == 1)
-
-    val row2 = DummyInputData.select(popcnt(1)).collect
-    assert(row2(0).getInt(0) == 1)
-
-    val row3 = DummyInputData.select(kld(0.1, 0.5, 0.2, 0.5)).collect
-    assert(row3(0).getDouble(0) ~== 0.01)
-
-    val row4 = DummyInputData.select(
-      euclid_distance(Seq("0.1", "0.5"), Seq("0.2", "0.5"))).collect
-    assert(row4(0).getFloat(0) ~== 1.4142135f)
-
-    val row5 = DummyInputData.select(
-      cosine_distance(Seq("0.8", "0.3"), Seq("0.4", "0.6"))).collect
-    assert(row5(0).getFloat(0) ~== 1.0f)
-
-    val row6 = DummyInputData.select(
-      angular_distance(Seq("0.1", "0.1"), Seq("0.3", "0.8"))).collect
-    assert(row6(0).getFloat(0) ~== 0.50f)
-
-    val row7 = DummyInputData.select(
-      manhattan_distance(Seq("0.7", "0.8"), Seq("0.5", "0.6"))).collect
-    assert(row7(0).getFloat(0) ~== 4.0f)
-
-    val row8 = DummyInputData.select(
-      minkowski_distance(Seq("0.1", "0.2"), Seq("0.2", "0.2"), 1.0)).collect
-    assert(row8(0).getFloat(0) ~== 2.0f)
-  }
-
-  test("knn.lsh") {
-    import hiveContext.implicits._
-    assert(IntList2Data.minhash(1, $"target").count() > 0)
-
-    assert(DummyInputData.select(bbit_minhash(Seq("1:0.1", "2:0.5"), false)).count
-      == DummyInputData.count)
-    assert(DummyInputData.select(minhashes(Seq("1:0.1", "2:0.5"), false)).count
-      == DummyInputData.count)
-  }
-
-  test("ftvec - add_bias") {
-    // TODO: This import does not work and why?
-    // import hiveContext.implicits._
-    assert(TinyTrainData.select(add_bias(TinyTrainData.col("features"))).collect.toSet
-      === Set(
-        Row(Seq("1:0.8", "2:0.2", "0:1.0")),
-        Row(Seq("2:0.7", "0:1.0")),
-        Row(Seq("1:0.9", "0:1.0"))))
-  }
-
-  test("ftvec - extract_feature") {
-    val row = DummyInputData.select(extract_feature("1:0.8")).collect
-    assert(row(0).getString(0) == "1")
-  }
-
-  test("ftvec - extract_weight") {
-    val row = DummyInputData.select(extract_weight("3:0.1")).collect
-    assert(row(0).getDouble(0) ~== 0.1)
-  }
-
-  test("ftvec - explode_array") {
-    import hiveContext.implicits._
-    assert(TinyTrainData.explode_array("features")
-        .select($"feature").collect.toSet
-      === Set(Row("1:0.8"), Row("2:0.2"), Row("2:0.7"), Row("1:0.9")))
-  }
-
-  test("ftvec - add_feature_index") {
-    // import hiveContext.implicits._
-    val doubleListData = {
-      // TODO: Use `toDF`
-      val rowRdd = hiveContext.sparkContext.parallelize(
-          Row(0.8 :: 0.5 :: Nil) ::
-          Row(0.3 :: 0.1 :: Nil) ::
-          Row(0.2 :: Nil) ::
-          Nil
-        )
-      hiveContext.createDataFrame(
-        rowRdd,
-        StructType(
-          StructField("data", ArrayType(DoubleType), true) ::
-          Nil)
-        )
-    }
-
-    assert(doubleListData.select(
-        add_feature_index(doubleListData.col("data"))).collect.toSet
-      === Set(
-        Row(Seq("1:0.8", "2:0.5")),
-        Row(Seq("1:0.3", "2:0.1")),
-        Row(Seq("1:0.2"))))
-  }
-
-  test("ftvec - sort_by_feature") {
-    // import hiveContext.implicits._
-    val intFloatMapData = {
-      // TODO: Use `toDF`
-      val rowRdd = hiveContext.sparkContext.parallelize(
-          Row(Map(1 -> 0.3f, 2 -> 0.1f, 3 -> 0.5f)) ::
-          Row(Map(2 -> 0.4f, 1 -> 0.2f)) ::
-          Row(Map(2 -> 0.4f, 3 -> 0.2f, 1 -> 0.1f, 4 -> 0.6f)) ::
-          Nil
-        )
-      hiveContext.createDataFrame(
-        rowRdd,
-        StructType(
-          StructField("data", MapType(IntegerType, FloatType), true) ::
-          Nil)
-        )
-    }
-
-    val sortedKeys = intFloatMapData.select(sort_by_feature(intFloatMapData.col("data")))
-      .collect.map {
-        case Row(m: Map[Int, Float]) => m.keysIterator.toSeq
-    }
-
-    assert(sortedKeys.toSet === Set(Seq(1, 2, 3), Seq(1, 2), Seq(1, 2, 3, 4)))
-  }
-
-  test("ftvec.hash") {
-    assert(DummyInputData.select(mhash("test")).count == DummyInputData.count)
-    assert(DummyInputData.select(sha1("test")).count == DummyInputData.count)
-    // assert(DummyInputData.select(array_hash_values(Seq("aaa", "bbb"))).count
-    //   == DummyInputData.count)
-    // assert(DummyInputData.select(prefixed_hash_values(Seq("ccc", "ddd"), "prefix")).count
-    //   == DummyInputData.count)
-  }
-
-  test("ftvec.scaling") {
-    assert(TinyTrainData.select(rescale(2.0f, 1.0, 5.0)).collect.toSet
-      === Set(Row(0.25f)))
-    assert(TinyTrainData.select(zscore(1.0f, 0.5, 0.5)).collect.toSet
-      === Set(Row(1.0f)))
-    assert(TinyTrainData.select(normalize(TinyTrainData.col("features"))).collect.toSet
-      === Set(
-        Row(Seq("1:0.9701425", "2:0.24253562")),
-        Row(Seq("2:1.0")),
-        Row(Seq("1:1.0"))))
-  }
-
-  test("ftvec.selection - chi2") {
-    import hiveContext.implicits._
-
-    // see also hivemall.ftvec.selection.ChiSquareUDFTest
-    val df = Seq(
-      Seq(
-        Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996),
-        Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3),
-        Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998)
-      ) -> Seq(
-        Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589),
-        Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589),
-        Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589)))
-      .toDF("arg0", "arg1")
-
-    val result = df.select(chi2(df("arg0"), df("arg1"))).collect
-    assert(result.length == 1)
-    val chi2Val = result.head.getAs[Row](0).getAs[Seq[Double]](0)
-    val pVal = result.head.getAs[Row](0).getAs[Seq[Double]](1)
-
-    (chi2Val, Seq(10.81782088, 3.59449902, 116.16984746, 67.24482759))
-      .zipped
-      .foreach((actual, expected) => assert(actual ~== expected))
-
-    (pVal, Seq(4.47651499e-03, 1.65754167e-01, 5.94344354e-26, 2.50017968e-15))
-      .zipped
-      .foreach((actual, expected) => assert(actual ~== expected))
-  }
-
-  test("ftvec.conv - quantify") {
-    import hiveContext.implicits._
-    val testDf = Seq((1, "aaa", true), (2, "bbb", false), (3, "aaa", false)).toDF
-    // This test is done in a single parition because `HivemallOps#quantify` assigns indentifiers
-    // for non-numerical values in each partition.
-    assert(testDf.coalesce(1).quantify(Seq[Column](true) ++ testDf.cols: _*).collect.toSet
-      === Set(Row(1, 0, 0), Row(2, 1, 1), Row(3, 0, 1)))
-  }
-
-  test("ftvec.amplify") {
-    import hiveContext.implicits._
-    assert(TinyTrainData.amplify(3, $"label", $"features").count() == 9)
-    assert(TinyTrainData.rand_amplify(3, "-buf 128", $"label", $"features").count() == 9)
-    assert(TinyTrainData.part_amplify(3).count() == 9)
-  }
-
-  ignore("ftvec.conv") {
-    import hiveContext.implicits._
-
-    val df1 = Seq((0.0, "1:0.1" :: "3:0.3" :: Nil), (1, 0, "2:0.2" :: Nil)).toDF("a", "b")
-    assert(df1.select(to_dense_features(df1("b"), 3)).collect.toSet
-      === Set(Row(Array(0.1f, 0.0f, 0.3f)), Array(0.0f, 0.2f, 0.0f)))
-
-    val df2 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c")
-    assert(df2.select(to_sparse_features(df2("a"), df2("b"), df2("c"))).collect.toSet
-      === Set(Row(Seq("1:0.1", "2:0.2", "3:0.3")), Row(Seq("1:0.2", "2:0.5", "3:0.4"))))
-  }
-
-  test("ftvec.trans") {
-    import hiveContext.implicits._
-
-    val df1 = Seq((1, -3, 1), (2, -2, 1)).toDF("a", "b", "c")
-    assert(df1.binarize_label($"a", $"b", $"c").collect.toSet === Set(Row(1, 1)))
-
-    val df2 = Seq((0.1f, 0.2f), (0.5f, 0.3f)).toDF("a", "b")
-    assert(df2.select(vectorize_features(Seq("a", "b"), df2("a"), df2("b"))).collect.toSet
-      === Set(Row(Seq("a:0.1", "b:0.2")), Row(Seq("a:0.5", "b:0.3"))))
-
-    val df3 = Seq(("c11", "c12"), ("c21", "c22")).toDF("a", "b")
-    assert(df3.select(categorical_features(Seq("a", "b"), df3("a"), df3("b"))).collect.toSet
-      === Set(Row(Seq("a#c11", "b#c12")), Row(Seq("a#c21", "b#c22"))))
-
-    val df4 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c")
-    assert(df4.select(indexed_features(df4("a"), df4("b"), df4("c"))).collect.toSet
-      === Set(Row(Seq("1:0.1", "2:0.2", "3:0.3")), Row(Seq("1:0.2", "2:0.5", "3:0.4"))))
-
-    val df5 = Seq(("xxx", "yyy", 0), ("zzz", "yyy", 1)).toDF("a", "b", "c")
-    assert(df5.coalesce(1).quantified_features(true, df5("a"), df5("b"), df5("c")).collect.toSet
-      === Set(Row(Seq(0.0, 0.0, 0.0)), Row(Seq(1.0, 0.0, 1.0))))
-
-    val df6 = Seq((0.1, 0.2), (0.5, 0.3)).toDF("a", "b")
-    assert(df6.select(quantitative_features(Seq("a", "b"), df6("a"), df6("b"))).collect.toSet
-      === Set(Row(Seq("a:0.1", "b:0.2")), Row(Seq("a:0.5", "b:0.3"))))
-  }
-
-  test("misc - hivemall_version") {
-    assert(DummyInputData.select(hivemall_version()).collect.toSet === Set(Row("0.4.2-rc.2")))
-    /**
-     * TODO: Why a test below does fail?
-     *
-     * checkAnswer(
-     *   DummyInputData.select(hivemall_version()).distinct,
-     *   Row("0.3.1")
-     * )
-     *
-     * The test throw an exception below:
-     *
-     * - hivemall_version *** FAILED ***
-     *  org.apache.spark.sql.AnalysisException:
-     *    Cannot resolve column name "HiveSimpleUDF#hivemall.HivemallVersionUDF()" among
-     *    (HiveSimpleUDF#hivemall.Hivemall VersionUDF());
-     *   at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
-     *   at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
-     *   at scala.Option.getOrElse(Option.scala:120)
-     *   at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158)
-     *   at org.apache.spark.sql.DataFrame$$anonfun$30.apply(DataFrame.scala:1227)
-     *   at org.apache.spark.sql.DataFrame$$anonfun$30.apply(DataFrame.scala:1227)
-     *   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
-     *   ...
-     */
-  }
-
-  test("misc - rowid") {
-    val df = DummyInputData.select(rowid())
-    assert(df.distinct.count == df.count)
-  }
-
-  test("misc - each_top_k") {
-    import hiveContext.implicits._
-    val testDf = Seq(
-      ("a", "1", 0.5), ("b", "5", 0.1), ("a", "3", 0.8), ("c", "6", 0.3), ("b", "4", 0.3),
-      ("a", "2", 0.6)
-    ).toDF("group", "attr", "value")
-
-    // Compute top-1 rows for each group
-    assert(
-      testDf.each_top_k(1, $"group", $"value", testDf("attr")).collect.toSet ===
-      Set(
-        Row(1, 0.8, "3"),
-        Row(1, 0.3, "4"),
-        Row(1, 0.3, "6")
-      ))
-
-    // Compute reverse top-1 rows for each group
-    assert(
-      testDf.each_top_k(-1, $"group", $"value", testDf("attr")).collect.toSet ===
-      Set(
-        Row(1, 0.5, "1"),
-        Row(1, 0.1, "5"),
-        Row(1, 0.3, "6")
-      ))
-  }
-
-  /**
-   * This test fails because;
-   *
-   * Cause: java.lang.OutOfMemoryError: Java heap space
-   *  at hivemall.smile.tools.RandomForestEnsembleUDAF$Result.<init>
-   *    (RandomForestEnsembleUDAF.java:128)
-   *  at hivemall.smile.tools.RandomForestEnsembleUDAF$RandomForestPredictUDAFEvaluator
-   *    .terminate(RandomForestEnsembleUDAF.java:91)
-   *  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-   */
-  ignore("misc - tree_predict") {
-    import hiveContext.implicits._
-
-    val model = Seq((0.0, 0.1 :: 0.1 :: Nil), (1.0, 0.2 :: 0.3 :: 0.2 :: Nil))
-      .toDF("label", "features")
-      .train_randomforest_regr($"features", $"label", "-trees 2")
-
-    val testData = Seq((0.0, 0.1 :: 0.0 :: Nil), (1.0, 0.3 :: 0.5 :: 0.4 :: Nil))
-      .toDF("label", "features")
-      .select(rowid(), $"label", $"features")
-
-    val predicted = model
-      .join(testData).coalesce(1)
-      .select(
-        $"rowid",
-        tree_predict(
-          model("model_id"), model("model_type"), model("pred_model"), testData("features"), true)
-            .as("predicted")
-      )
-      .groupby($"rowid")
-      .rf_ensemble("predicted").as("rowid", "predicted")
-      .select($"predicted.label")
-
-    checkAnswer(predicted, Seq(Row(0), Row(1)))
-  }
-
-  test("tools.array - select_k_best") {
-    import hiveContext.implicits._
-    import org.apache.spark.sql.functions._
-
-    val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9))
-    val df = data.map(d => (d, Seq(3, 1, 2))).toDF("features", "importance_list")
-    val k = 2
-
-    // if use checkAnswer here, fail for some reason, maybe type? but it's okay on spark-2.0
-    assert(df.select(select_k_best(df("features"), df("importance_list"), lit(k))).collect ===
-      data.map(s => Row(Seq(s(0).toDouble, s(2).toDouble))))
-  }
-
-  test("misc - sigmoid") {
-    import hiveContext.implicits._
-    /**
-     * TODO: SigmodUDF only accepts floating-point types in spark-v1.5.0?
-     * This test throws an exception below:
-     *
-     * org.apache.spark.sql.catalyst.analysis.UnresolvedException:
-     *    Invalid call to dataType on unresolved object, tree: 'data
-     *  at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType
-     *    (unresolved.scala:59)
-     *  at org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$method$1.apply(hiveUDFs.scala:119)
-     * ...
-     */
-    val rows = DummyInputData.select(sigmoid($"c0")).collect
-    assert(rows(0).getDouble(0) ~== 0.500)
-    assert(rows(1).getDouble(0) ~== 0.731)
-    assert(rows(2).getDouble(0) ~== 0.880)
-    assert(rows(3).getDouble(0) ~== 0.952)
-  }
-
-  test("misc - lr_datagen") {
-    assert(TinyTrainData.lr_datagen("-n_examples 100 -n_features 10 -seed 100").count >= 100)
-  }
-
-  test("invoke regression functions") {
-    import hiveContext.implicits._
-    Seq(
-      "train_adadelta",
-      "train_adagrad",
-      "train_arow_regr",
-      "train_arowe_regr",
-      "train_arowe2_regr",
-      "train_logregr",
-      "train_pa1_regr",
-      "train_pa1a_regr",
-      "train_pa2_regr",
-      "train_pa2a_regr"
-    ).map { func =>
-      invokeFunc(new HivemallOps(TinyTrainData), func, Seq($"features", $"label"))
-        .foreach(_ => {}) // Just call it
-    }
-  }
-
-  test("invoke classifier functions") {
-    import hiveContext.implicits._
-    Seq(
-      "train_perceptron",
-      "train_pa",
-      "train_pa1",
-      "train_pa2",
-      "train_cw",
-      "train_arow",
-      "train_arowh",
-      "train_scw",
-      "train_scw2",
-      "train_adagrad_rda"
-    ).map { func =>
-      invokeFunc(new HivemallOps(TinyTrainData), func, Seq($"features", $"label"))
-        .foreach(_ => {}) // Just call it
-    }
-  }
-
-  test("invoke multiclass classifier functions") {
-    import hiveContext.implicits._
-    Seq(
-      "train_multiclass_perceptron",
-      "train_multiclass_pa",
-      "train_multiclass_pa1",
-      "train_multiclass_pa2",
-      "train_multiclass_cw",
-      "train_multiclass_arow",
-      "train_multiclass_scw",
-      "train_multiclass_scw2"
-    ).map { func =>
-      // TODO: Why is a label type [Int|Text] only in multiclass classifiers?
-      invokeFunc(new HivemallOps(TinyTrainData), func, Seq($"features", $"label".cast(IntegerType)))
-        .foreach(_ => {}) // Just call it
-    }
-  }
-
-  test("invoke random forest functions") {
-    import hiveContext.implicits._
-    val testDf = Seq(
-      (Array(0.3, 0.1, 0.2), 1),
-      (Array(0.3, 0.1, 0.2), 0),
-      (Array(0.3, 0.1, 0.2), 0)).toDF.as("features", "label")
-    Seq(
-      "train_randomforest_regr",
-      "train_randomforest_classifier"
-    ).map { func =>
-      invokeFunc(new HivemallOps(testDf.coalesce(1)), func, Seq($"features", $"label"))
-        .foreach(_ => {}) // Just call it
-    }
-  }
-
-  ignore("check regression precision") {
-    Seq(
-      "train_adadelta",
-      "train_adagrad",
-      "train_arow_regr",
-      "train_arowe_regr",
-      "train_arowe2_regr",
-      "train_logregr",
-      "train_pa1_regr",
-      "train_pa1a_regr",
-      "train_pa2_regr",
-      "train_pa2a_regr"
-    ).map { func =>
-      checkRegrPrecision(func)
-    }
-  }
-
-  ignore("check classifier precision") {
-    Seq(
-      "train_perceptron",
-      "train_pa",
-      "train_pa1",
-      "train_pa2",
-      "train_cw",
-      "train_arow",
-      "train_arowh",
-      "train_scw",
-      "train_scw2",
-      "train_adagrad_rda"
-    ).map { func =>
-      checkClassifierPrecision(func)
-    }
-  }
-
-  test("user-defined aggregators for ensembles") {
-    import hiveContext.implicits._
-
-    val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF.as("c0", "c1")
-    val row1 = df1.groupby($"c0").voted_avg("c1").collect
-    assert(row1(0).getDouble(1) ~== 0.15)
-    assert(row1(1).getDouble(1) ~== 0.10)
-
-    val df2 = Seq((1, 0.6f), (2, 0.2f), (1, 0.2f)).toDF.as("c0", "c1")
-    val row2 = df2.groupby($"c0").agg("c1" -> "voted_avg").collect
-    assert(row2(0).getDouble(1) ~== 0.40)
-    assert(row2(1).getDouble(1) ~== 0.20)
-
-    val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF.as("c0", "c1")
-    val row3 = df3.groupby($"c0").weight_voted_avg("c1").collect
-    assert(row3(0).getDouble(1) ~== 0.50)
-    assert(row3(1).getDouble(1) ~== 0.30)
-
-    val df4 = Seq((1, 0.1f), (2, 0.9f), (1, 0.1f)).toDF.as("c0", "c1")
-    val row4 = df4.groupby($"c0").agg("c1" -> "weight_voted_avg").collect
-    assert(row4(0).getDouble(1) ~== 0.10)
-    assert(row4(1).getDouble(1) ~== 0.90)
-
-    val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF.as("c0", "c1", "c2")
-    val row5 = df5.groupby($"c0").argmin_kld("c1", "c2").collect
-    assert(row5(0).getFloat(1) ~== 0.266666666)
-    assert(row5(1).getFloat(1) ~== 0.80)
-
-    val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF.as("c0", "c1", "c2")
-    val row6 = df6.groupby($"c0").max_label("c2", "c1").collect
-    assert(row6(0).getString(1) == "id-1")
-
-    val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF.as("c0", "c1", "c2")
-    val row7 = df7.groupby($"c0").maxrow("c2", "c1").as("c0", "c1").select($"c1.col1").collect
-    assert(row7(0).getString(0) == "id-0")
-
-    // val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF.as("c0", "c1")
-    // val row8 = df8.groupby($"c0").rf_ensemble("c1").as("c0", "c1")
-    //    .select("c1.probability").collect
-    // assert(row8(0).getDouble(0) ~== 0.3333333333)
-    // assert(row8(1).getDouble(0) ~== 1.0)
-
-    // val df9 = Seq((1, 3), (1, 8), (2, 9), (1, 1)).toDF.as("c0", "c1")
-    // val row9 = df9.groupby($"c0").agg("c1" -> "rf_ensemble").as("c0", "c1")
-    //   .select("c1.probability").collect
-    // assert(row9(0).getDouble(0) ~== 0.3333333333)
-    // assert(row9(1).getDouble(0) ~== 1.0)
-  }
-
-  test("user-defined aggregators for evaluation") {
-    import hiveContext.implicits._
-
-    val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF.as("c0", "c1", "c2")
-    val row1 = df1.groupby($"c0").mae("c1", "c2").collect
-    assert(row1(0).getDouble(1) ~== 0.26666666)
-
-    val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2")
-    val row2 = df2.groupby($"c0").mse("c1", "c2").collect
-    assert(row2(0).getDouble(1) ~== 0.29999999)
-
-    val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2")
-    val row3 = df3.groupby($"c0").rmse("c1", "c2").collect
-    assert(row3(0).getDouble(1) ~== 0.54772253)
-
-    val df4 = Seq((1, Array(1, 2), Array(2, 3)), (1, Array(3, 8), Array(5, 4))).toDF
-      .as("c0", "c1", "c2")
-    val row4 = df4.groupby($"c0").f1score("c1", "c2").collect
-    assert(row4(0).getDouble(1) ~== 0.25)
-  }
-
-  ignore("user-defined aggregators for ftvec.trans") {
-    import hiveContext.implicits._
-
-    val df0 = Seq((1, "cat", "mammal", 9), (1, "dog", "mammal", 10), (1, "human", "mammal", 10),
-      (1, "seahawk", "bird", 101), (1, "wasp", "insect", 3), (1, "wasp", "insect", 9),
-      (1, "cat", "mammal", 101), (1, "dog", "mammal", 1), (1, "human", "mammal", 9))
-      .toDF("col0", "cat1", "cat2", "cat3")
-    val row00 = df0.groupby($"col0").onehot_encoding("cat1")
-    val row01 = df0.groupby($"col0").onehot_encoding("cat1", "cat2", "cat3")
-
-    val result000 = row00.collect()(0).getAs[Row](1).getAs[Map[String, Int]](0)
-    val result01 = row01.collect()(0).getAs[Row](1)
-    val result010 = result01.getAs[Map[String, Int]](0)
-    val result011 = result01.getAs[Map[String, Int]](1)
-    val result012 = result01.getAs[Map[String, Int]](2)
-
-    assert(result000.keySet === Set("seahawk", "cat", "human", "wasp", "dog"))
-    assert(result000.values.toSet === Set(1, 2, 3, 4, 5))
-    assert(result010.keySet === Set("seahawk", "cat", "human", "wasp", "dog"))
-    assert(result010.values.toSet === Set(1, 2, 3, 4, 5))
-    assert(result011.keySet === Set("bird", "insect", "mammal"))
-    assert(result011.values.toSet === Set(6, 7, 8))
-    assert(result012.keySet === Set(1, 3, 9, 10, 101))
-    assert(result012.values.toSet === Set(9, 10, 11, 12, 13))
-  }
-
-  test("user-defined aggregators for ftvec.selection") {
-    import hiveContext.implicits._
-
-    // see also hivemall.ftvec.selection.SignalNoiseRatioUDAFTest
-    // binary class
-    // +-----------------+-------+
-    // |     features    | class |
-    // +-----------------+-------+
-    // | 5.1,3.5,1.4,0.2 |     0 |
-    // | 4.9,3.0,1.4,0.2 |     0 |
-    // | 4.7,3.2,1.3,0.2 |     0 |
-    // | 7.0,3.2,4.7,1.4 |     1 |
-    // | 6.4,3.2,4.5,1.5 |     1 |
-    // | 6.9,3.1,4.9,1.5 |     1 |
-    // +-----------------+-------+
-    val df0 = Seq(
-      (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0)),
-      (1, Seq(4.7, 3.2, 1.3, 0.2), Seq(1, 0)), (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1)),
-      (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1)), (1, Seq(6.9, 3.1, 4.9, 1.5), Seq(0, 1)))
-      .toDF("c0", "arg0", "arg1")
-    val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect
-    (row0(0).getAs[Seq[Double]](1), Seq(4.38425236, 0.26390002, 15.83984511, 26.87005769))
-      .zipped
-      .foreach((actual, expected) => assert(actual ~== expected))
-
-    // multiple class
-    // +-----------------+-------+
-    // |     features    | class |
-    // +-----------------+-------+
-    // | 5.1,3.5,1.4,0.2 |     0 |
-    // | 4.9,3.0,1.4,0.2 |     0 |
-    // | 7.0,3.2,4.7,1.4 |     1 |
-    // | 6.4,3.2,4.5,1.5 |     1 |
-    // | 6.3,3.3,6.0,2.5 |     2 |
-    // | 5.8,2.7,5.1,1.9 |     2 |
-    // +-----------------+-------+
-    val df1 = Seq(
-      (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0, 0)),
-      (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1, 0)), (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1, 0)),
-      (1, Seq(6.3, 3.3, 6.0, 2.5), Seq(0, 0, 1)), (1, Seq(5.8, 2.7, 5.1, 1.9), Seq(0, 0, 1)))
-      .toDF("c0", "arg0", "arg1")
-    val row1 = df1.groupby($"c0").snr("arg0", "arg1").collect
-    (row1(0).getAs[Seq[Double]](1), Seq(8.43181818, 1.32121212, 42.94949495, 33.80952381))
-      .zipped
-      .foreach((actual, expected) => assert(actual ~== expected))
-  }
-
-  test("user-defined aggregators for tools.matrix") {
-    import hiveContext.implicits._
-
-    // | 1  2  3 |T    | 5  6  7 |
-    // | 3  4  5 |  *  | 7  8  9 |
-    val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 9)))
-      .toDF("c0", "arg0", "arg1")
-
-    // if use checkAnswer here, fail for some reason, maybe type? but it's okay on spark-2.0
-    assert(df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect() ===
-      Seq(Row(1, Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0)))))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
deleted file mode 100644
index 276bc13..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import java.io.{BufferedInputStream, BufferedReader, InputStream, InputStreamReader}
-import java.net.URL
-import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService}
-
-import hivemall.mix.server.MixServer
-import hivemall.mix.server.MixServer.ServerState
-import hivemall.utils.lang.CommandLineUtils
-import hivemall.utils.net.NetUtils
-import org.apache.commons.cli.Options
-import org.apache.commons.compress.compressors.CompressorStreamFactory
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.feature.HivemallLabeledPoint
-import org.apache.spark.sql.{Column, DataFrame, Row}
-import org.apache.spark.sql.functions.when
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
-import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.test.TestUtils.{benchmark, invokeFunc}
-
-final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
-
-  // Load A9a training and test data
-  val a9aLineParser = (line: String) => {
-    val elements = line.split(" ")
-    val (label, features) = (elements.head, elements.tail)
-    HivemallLabeledPoint(if (label == "+1") 1.0f else 0.0f, features)
-  }
-
-  lazy val trainA9aData: DataFrame =
-    getDataFromURI(
-      new URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a").openStream,
-      a9aLineParser)
-
-  lazy val testA9aData: DataFrame =
-    getDataFromURI(
-      new URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a.t").openStream,
-      a9aLineParser)
-
-  // Load A9a training and test data
-  val kdd2010aLineParser = (line: String) => {
-    val elements = line.split(" ")
-    val (label, features) = (elements.head, elements.tail)
-    HivemallLabeledPoint(if (label == "1") 1.0f else 0.0f, features)
-  }
-
-  lazy val trainKdd2010aData: DataFrame =
-    getDataFromURI(
-      new CompressorStreamFactory().createCompressorInputStream(
-        new BufferedInputStream(
-          new URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.bz2")
-            .openStream
-        )
-      ),
-      kdd2010aLineParser,
-      8)
-
-  lazy val testKdd2010aData: DataFrame =
-    getDataFromURI(
-      new CompressorStreamFactory().createCompressorInputStream(
-        new BufferedInputStream(
-          new URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2")
-            .openStream
-        )
-      ),
-      kdd2010aLineParser,
-      8)
-
-  // Placeholder for a mix server
-  var mixServExec: ExecutorService = _
-  var assignedPort: Int = _
-
-  private def getDataFromURI(
-      in: InputStream, lineParseFunc: String => HivemallLabeledPoint, numPart: Int = 2)
-    : DataFrame = {
-    val reader = new BufferedReader(new InputStreamReader(in))
-    try {
-      // Cache all data because stream closed soon
-      val lines = FileIterator(reader.readLine()).toSeq
-      val rdd = TestHive.sparkContext.parallelize(lines, numPart).map(lineParseFunc)
-      val df = rdd.toDF.cache
-      df.foreach(_ => {})
-      df
-    } finally {
-      reader.close()
-    }
-  }
-
-  before {
-    assert(mixServExec == null)
-
-    // Launch a MIX server as thread
-    assignedPort = NetUtils.getAvailablePort
-    val method = classOf[MixServer].getDeclaredMethod("getOptions")
-    method.setAccessible(true)
-    val options = method.invoke(null).asInstanceOf[Options]
-    val cl = CommandLineUtils.parseOptions(
-      Array(
-        "-port", Integer.toString(assignedPort),
-        "-sync_threshold", "1"
-      ),
-      options
-    )
-    val server = new MixServer(cl)
-    mixServExec = Executors.newSingleThreadExecutor()
-    mixServExec.submit(server)
-    var retry = 0
-    while (server.getState() != ServerState.RUNNING && retry < 32) {
-      Thread.sleep(100L)
-      retry += 1
-    }
-    assert(ServerState.RUNNING == server.getState)
-  }
-
-  after {
-    mixServExec.shutdownNow()
-    mixServExec = null
-  }
-
-  benchmark("model mixing test w/ regression") {
-    Seq(
-      "train_adadelta",
-      "train_adagrad",
-      "train_arow_regr",
-      "train_arowe_regr",
-      "train_arowe2_regr",
-      "train_logregr",
-      "train_pa1_regr",
-      "train_pa1a_regr",
-      "train_pa2_regr",
-      "train_pa2a_regr"
-    ).map { func =>
-      // Build a model
-      val model = {
-        val groupId = s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}"
-        val res = invokeFunc(new HivemallOps(trainA9aData.part_amplify(1)), func,
-          Seq[Column](add_bias($"features"), $"label",
-            s"-mix localhost:${assignedPort} -mix_session ${groupId} -mix_threshold 2 -mix_cancel"))
-        if (!res.columns.contains("conv")) {
-          res.groupby("feature").agg("weight" -> "avg")
-        } else {
-          res.groupby("feature").argmin_kld("weight", "conv")
-        }
-      }.as("feature", "weight")
-
-      // Data preparation
-      val testDf = testA9aData
-        .select(rowid(), $"label".as("target"), $"features")
-        .cache
-
-      val testDf_exploded = testDf
-        .explode_array($"features")
-        .select($"rowid", extract_feature($"feature"), extract_weight($"feature"))
-
-      // Do prediction
-      val predict = testDf_exploded
-        .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
-        .select($"rowid", ($"weight" * $"value").as("value"))
-        .groupby("rowid").sum("value")
-        .as("rowid", "predicted")
-
-      // Evaluation
-      val eval = predict
-        .join(testDf, predict("rowid") === testDf("rowid"))
-        .groupby()
-        .agg(Map("target" -> "avg", "predicted" -> "avg"))
-        .as("target", "predicted")
-
-      val (target, predicted) = eval.map {
-        case Row(target: Double, predicted: Double) => (target, predicted)
-      }.first
-
-      // scalastyle:off println
-      println(s"func:${func} target:${target} predicted:${predicted} "
-        + s"diff:${Math.abs(target - predicted)}")
-
-      testDf.unpersist()
-    }
-  }
-
-  benchmark("model mixing test w/ binary classification") {
-    Seq(
-      "train_perceptron",
-      "train_pa",
-      "train_pa1",
-      "train_pa2",
-      "train_cw",
-      "train_arow",
-      "train_arowh",
-      "train_scw",
-      "train_scw2",
-      "train_adagrad_rda"
-    ).map { func =>
-      // Build a model
-      val model = {
-        val groupId = s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}"
-        val res = invokeFunc(new HivemallOps(trainKdd2010aData.part_amplify(1)), func,
-          Seq[Column](add_bias($"features"), $"label",
-            s"-mix localhost:${assignedPort} -mix_session ${groupId} -mix_threshold 2 -mix_cancel"))
-        if (!res.columns.contains("conv")) {
-          res.groupby("feature").agg("weight" -> "avg")
-        } else {
-          res.groupby("feature").argmin_kld("weight", "conv")
-        }
-      }.as("feature", "weight")
-
-      // Data preparation
-      val testDf = testKdd2010aData
-        .select(rowid(), $"label".as("target"), $"features")
-        .cache
-
-      val testDf_exploded = testDf
-        .explode_array($"features")
-        .select($"rowid", extract_feature($"feature"), extract_weight($"feature"))
-
-      // Do prediction
-      val predict = testDf_exploded
-        .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
-        .select($"rowid", ($"weight" * $"value").as("value"))
-        .groupby("rowid").sum("value")
-        .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0))
-        .as("rowid", "predicted")
-
-      // Evaluation
-      val eval = predict
-        .join(testDf, predict("rowid") === testDf("rowid"))
-        .where($"target" === $"predicted")
-
-      // scalastyle:off println
-      println(s"func:${func} precision:${(eval.count + 0.0) / predict.count}")
-
-      testDf.unpersist()
-      predict.unpersist()
-    }
-  }
-}
-
-object FileIterator {
-
-  def apply[A](f: => A): Iterator[A] = new Iterator[A] {
-    var opt = Option(f)
-    def hasNext = opt.nonEmpty
-    def next() = {
-      val r = opt.get
-      opt = Option(f)
-      r
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/streaming/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/streaming/HivemallOpsSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/streaming/HivemallOpsSuite.scala
deleted file mode 100644
index 5cc590a..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/streaming/HivemallOpsSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.streaming
-
-import reflect.ClassTag
-
-import org.apache.spark.ml.feature.HivemallLabeledPoint
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.streaming.HivemallStreamingOps._
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.scheduler.StreamInputInfo
-import org.apache.spark.test.HivemallQueryTest
-
-/**
- * This is an input stream just for tests.
- */
-private[this] class TestInputStream[T: ClassTag](
-    ssc: StreamingContext,
-    input: Seq[Seq[T]],
-    numPartitions: Int) extends InputDStream[T](ssc) {
-
-  override def start() {}
-
-  override def stop() {}
-
-  override def compute(validTime: Time): Option[RDD[T]] = {
-    logInfo("Computing RDD for time " + validTime)
-    val index = ((validTime - zeroTime) / slideDuration - 1).toInt
-    val selectedInput = if (index < input.size) input(index) else Seq[T]()
-
-    // lets us test cases where RDDs are not created
-    if (selectedInput == null) {
-      return None
-    }
-
-    // Report the input data's information to InputInfoTracker for testing
-    val inputInfo = StreamInputInfo(id, selectedInput.length.toLong)
-    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
-
-    val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
-    logInfo("Created RDD " + rdd.id + " with " + selectedInput)
-    Some(rdd)
-  }
-}
-
-final class HivemallOpsSuite extends HivemallQueryTest {
-
-  // This implicit value used in `HivemallStreamingOps`
-  implicit val sqlCtx = sqlContext
-
-  /**
-   * Run a block of code with the given StreamingContext.
-   * This method do not stop a given SparkContext because other tests share the context.
-   */
-  private def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): Unit = {
-    try {
-      block(ssc)
-      ssc.start()
-      ssc.awaitTerminationOrTimeout(10 * 1000) // 10s wait
-    } finally {
-      try {
-        ssc.stop(stopSparkContext = false)
-      } catch {
-        case e: Exception => logError("Error stopping StreamingContext", e)
-      }
-    }
-  }
-
-  test("streaming") {
-    import sqlCtx.implicits._
-
-    // We assume we build a model in advance
-    val testModel = Seq(
-      ("0", 0.3f), ("1", 0.1f), ("2", 0.6f), ("3", 0.2f)
-    ).toDF("feature", "weight")
-
-    withStreamingContext(new StreamingContext(sqlCtx.sparkContext, Milliseconds(100))) { ssc =>
-      val inputData = Seq(
-        Seq(HivemallLabeledPoint(features = "1:0.6" :: "2:0.1" :: Nil)),
-        Seq(HivemallLabeledPoint(features = "2:0.9" :: Nil)),
-        Seq(HivemallLabeledPoint(features = "1:0.2" :: Nil)),
-        Seq(HivemallLabeledPoint(features = "2:0.1" :: Nil)),
-        Seq(HivemallLabeledPoint(features = "0:0.6" :: "2:0.4" :: Nil))
-      )
-
-      val inputStream = new TestInputStream[HivemallLabeledPoint](ssc, inputData, 1)
-
-      // Apply predictions on input streams
-      val prediction = inputStream.predict { streamDf =>
-          val df = streamDf.select(rowid(), $"features").explode_array($"features")
-          val testDf = df.select(
-            // TODO: `$"feature"` throws AnalysisException, why?
-            $"rowid", extract_feature(df("feature")), extract_weight(df("feature"))
-          )
-          testDf.join(testModel, testDf("feature") === testModel("feature"), "LEFT_OUTER")
-            .select($"rowid", ($"weight" * $"value").as("value"))
-            .groupby("rowid").sum("value")
-            .as("rowid", "value")
-            .select($"rowid", sigmoid($"value"))
-        }
-
-      // Dummy output stream
-      prediction.foreachRDD(_ => {})
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/test/HivemallQueryTest.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/test/HivemallQueryTest.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/test/HivemallQueryTest.scala
deleted file mode 100644
index 020cf8c..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/test/HivemallQueryTest.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.test
-
-import scala.collection.mutable.Seq
-
-import hivemall.tools.RegressionDatagen
-
-import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.functions
-import org.apache.spark.sql.hive.HivemallOps
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.test.TestUtils._
-
-/**
- * Base class for a shared SparkContext in test uses
- */
-abstract class HivemallQueryTest extends QueryTest with TestHiveSingleton {
-
-  import hiveContext.implicits._
-
-  protected val DummyInputData =
-    Seq(
-      (0, 0), (1, 1), (2, 2), (3, 3)
-    ).toDF("c0", "c1")
-
-  protected val IntList2Data =
-    Seq(
-      (8 :: 5 :: Nil, 6 :: 4 :: Nil),
-      (3 :: 1 :: Nil, 3 :: 2 :: Nil),
-      (2 :: Nil, 3 :: Nil)
-    ).toDF("target", "predict")
-
-  protected val Float2Data =
-    Seq(
-      (0.8f, 0.3f), (0.3f, 0.9f), (0.2f, 0.4f)
-    ).toDF("target", "predict")
-
-  protected val TinyTrainData =
-    Seq(
-      (0.0, "1:0.8" :: "2:0.2" :: Nil),
-      (1.0, "2:0.7" :: Nil),
-      (0.0, "1:0.9" :: Nil)
-    ).toDF("label", "features")
-
-  protected val TinyTestData =
-    Seq(
-      (0.0, "1:0.6" :: "2:0.1" :: Nil),
-      (1.0, "2:0.9" :: Nil),
-      (0.0, "1:0.2" :: Nil),
-      (0.0, "2:0.1" :: Nil),
-      (0.0, "0:0.6" :: "2:0.4" :: Nil)
-    ).toDF("label", "features")
-
-  protected val LargeRegrTrainData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100000,
-      seed = 3,
-      prob_one = 0.8f
-    ).cache
-
-  protected val LargeRegrTestData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100,
-      seed = 3,
-      prob_one = 0.5f
-    ).cache
-
-  protected val LargeClassifierTrainData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100000,
-      seed = 5,
-      prob_one = 0.8f,
-      cl = true
-    ).cache
-
-  protected val LargeClassifierTestData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100,
-      seed = 5,
-      prob_one = 0.5f,
-      cl = true
-    ).cache
-
-  protected def checkRegrPrecision(func: String): Unit = {
-    // Build a model
-    val model = {
-      val res = invokeFunc(new HivemallOps(LargeRegrTrainData),
-        func, Seq(add_bias($"features"), $"label"))
-      if (!res.columns.contains("conv")) {
-        res.groupby("feature").agg("weight" -> "avg")
-      } else {
-        res.groupby("feature").argmin_kld("weight", "conv")
-      }
-    }.as("feature", "weight")
-
-    // Data preparation
-    val testDf = LargeRegrTrainData
-      .select(rowid(), $"label".as("target"), $"features")
-      .cache
-
-    val testDf_exploded = testDf
-      .explode_array($"features")
-      .select($"rowid", extract_feature($"feature"), extract_weight($"feature"))
-
-    // Do prediction
-    val predict = testDf_exploded
-      .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
-      .select($"rowid", ($"weight" * $"value").as("value"))
-      .groupby("rowid").sum("value")
-      .as("rowid", "predicted")
-
-    // Evaluation
-    val eval = predict
-      .join(testDf, predict("rowid") === testDf("rowid"))
-      .groupby()
-      .agg(Map("target" -> "avg", "predicted" -> "avg"))
-      .as("target", "predicted")
-
-    val diff = eval.map {
-      case Row(target: Double, predicted: Double) =>
-        Math.abs(target - predicted)
-    }.first
-
-    TestUtils.expectResult(diff > 0.10,
-      s"Low precision -> func:${func} diff:${diff}")
-  }
-
-  protected def checkClassifierPrecision(func: String): Unit = {
-    // Build a model
-    val model = {
-      val res = invokeFunc(new HivemallOps(LargeClassifierTrainData),
-        func, Seq(add_bias($"features"), $"label"))
-      if (!res.columns.contains("conv")) {
-        res.groupby("feature").agg("weight" -> "avg")
-      } else {
-        res.groupby("feature").argmin_kld("weight", "conv")
-      }
-    }.as("feature", "weight")
-
-    // Data preparation
-    val testDf = LargeClassifierTestData
-      .select(rowid(), $"label".as("target"), $"features")
-      .cache
-
-    val testDf_exploded = testDf
-      .explode_array($"features")
-      .select($"rowid", extract_feature($"feature"), extract_weight($"feature"))
-
-    // Do prediction
-    val predict = testDf_exploded
-      .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
-      .select($"rowid", ($"weight" * $"value").as("value"))
-      .groupby("rowid").sum("value")
-      /**
-       * TODO: This sentence throws an exception below:
-       *
-       * WARN Column: Constructing trivially true equals predicate, 'rowid#1323 = rowid#1323'.
-       * Perhaps you need to use aliases.
-       */
-      // .select($"rowid", functions.when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0)
-      // .as("predicted"))
-      .select($"rowid", functions.when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0))
-      .as("rowid", "predicted")
-
-    // Evaluation
-    val eval = predict
-      .join(testDf, predict("rowid") === testDf("rowid"))
-      .where($"target" === $"predicted")
-
-    val precision = (eval.count + 0.0) / predict.count
-
-    TestUtils.expectResult(precision < 0.70,
-      s"Low precision -> func:${func} value:${precision}")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/test/TestUtils.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/test/TestUtils.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/test/TestUtils.scala
deleted file mode 100644
index 9c77fae..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/test/TestUtils.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.test
-
-import scala.reflect.runtime.{universe => ru}
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.DataFrame
-
-object TestUtils extends Logging {
-
-  // Do benchmark if INFO-log enabled
-  def benchmark(benchName: String)(testFunc: => Unit): Unit = {
-    if (log.isDebugEnabled) {
-      testFunc
-    }
-  }
-
-  def expectResult(res: Boolean, errMsg: String): Unit = if (res) {
-    logWarning(errMsg)
-  }
-
-  def invokeFunc(cls: Any, func: String, args: Any*): DataFrame = try {
-    // Invoke a function with the given name via reflection
-    val im = scala.reflect.runtime.currentMirror.reflect(cls)
-    val mSym = im.symbol.typeSignature.member(ru.newTermName(func)).asMethod
-    im.reflectMethod(mSym).apply(args: _*)
-      .asInstanceOf[DataFrame]
-  } catch {
-    case e: Exception =>
-      assert(false, s"Invoking ${func} failed because: ${e.getMessage}")
-      null // Not executed
-  }
-}
-
-// TODO: Any same function in o.a.spark.*?
-class TestDoubleWrapper(d: Double) {
-  // Check an equality between Double values
-  def ~==(d: Double): Boolean = Math.abs(this.d - d) < 0.001
-}
-
-object TestDoubleWrapper {
-  @inline implicit def toTestDoubleWrapper(d: Double): TestDoubleWrapper = {
-    new TestDoubleWrapper(d)
-  }
-}


[3/3] incubator-hivemall git commit: Close #67: [HIVEMALL-55][SPARK] Drop the Spark-v1.6 support

Posted by my...@apache.org.
Close #67: [HIVEMALL-55][SPARK] Drop the Spark-v1.6 support


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/c53b9ff9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/c53b9ff9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/c53b9ff9

Branch: refs/heads/master
Commit: c53b9ff9b23755c7d211390ea97bc911a80674bf
Parents: 8dc3a02
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Mon Apr 10 06:43:20 2017 +0900
Committer: myui <yu...@gmail.com>
Committed: Mon Apr 10 06:43:20 2017 +0900

----------------------------------------------------------------------
 .travis.yml                                     |    3 +-
 NOTICE                                          |    1 -
 bin/format_header.sh                            |    3 -
 pom.xml                                         |   11 -
 spark/spark-1.6/bin/mvn-zinc                    |   99 --
 spark/spark-1.6/extra-src/README                |    1 -
 .../org/apache/spark/sql/hive/HiveShim.scala    |  527 --------
 spark/spark-1.6/pom.xml                         |  261 ----
 .../src/main/resources/log4j.properties         |   12 -
 .../hivemall/tools/RegressionDatagen.scala      |   66 -
 .../apache/spark/sql/hive/GroupedDataEx.scala   |  303 -----
 .../org/apache/spark/sql/hive/HivemallOps.scala | 1125 ------------------
 .../apache/spark/sql/hive/HivemallUtils.scala   |  112 --
 .../src/test/resources/log4j.properties         |    7 -
 .../hivemall/mix/server/MixServerSuite.scala    |  123 --
 .../hivemall/tools/RegressionDatagenSuite.scala |   32 -
 .../scala/org/apache/spark/SparkFunSuite.scala  |   48 -
 .../ml/feature/HivemallLabeledPointSuite.scala  |   35 -
 .../scala/org/apache/spark/sql/QueryTest.scala  |  295 -----
 .../spark/sql/catalyst/plans/PlanTest.scala     |   61 -
 .../apache/spark/sql/hive/HiveUdfSuite.scala    |  113 --
 .../spark/sql/hive/HivemallOpsSuite.scala       |  665 -----------
 .../spark/sql/hive/ModelMixingSuite.scala       |  272 -----
 .../spark/streaming/HivemallOpsSuite.scala      |  123 --
 .../apache/spark/test/HivemallQueryTest.scala   |  197 ---
 .../scala/org/apache/spark/test/TestUtils.scala |   62 -
 26 files changed, 1 insertion(+), 4556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a4cb7ea..323e36a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -35,9 +35,8 @@ notifications:
 
 script:
   - mvn -q scalastyle:check test -Pspark-2.1
-  # test the spark-1.6/2.0 modules only in the following runs
+  # test the spark-2.0 modules only in the following runs
   - mvn -q scalastyle:check clean -Pspark-2.0 -pl spark/spark-2.0 -am test -Dtest=none
-  - mvn -q scalastyle:check clean -Pspark-1.6 -pl spark/spark-1.6 -am test -Dtest=none
 
 after_success:
   - mvn clean cobertura:cobertura coveralls:report

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 0911f50..79a944c 100644
--- a/NOTICE
+++ b/NOTICE
@@ -50,7 +50,6 @@ o hivemall/core/src/main/java/hivemall/utils/buffer/DynamicByteArray.java
     https://orc.apache.org/
     Licensed under the Apache License, Version 2.0
 
-o hivemall/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
   hivemall/spark/spark-2.0/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
   hivemall/spark/spark-2.0/src/test/scala/org/apache/spark/sql/QueryTest.scala
   hivemall/spark/spark-2.0/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/bin/format_header.sh
----------------------------------------------------------------------
diff --git a/bin/format_header.sh b/bin/format_header.sh
index da67420..f2c063b 100755
--- a/bin/format_header.sh
+++ b/bin/format_header.sh
@@ -37,8 +37,5 @@ mvn license:format
 cd $HIVEMALL_HOME/spark/spark-common
 mvn license:format -P spark-2.0
 
-cd $HIVEMALL_HOME/spark/spark-1.6
-mvn license:format -P spark-1.6
-
 cd $HIVEMALL_HOME/spark/spark-2.0
 mvn license:format -P spark-2.0

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a2276e9..7743d5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -293,17 +293,6 @@
 			</properties>
 		</profile>
 		<profile>
-			<id>spark-1.6</id>
-			<modules>
-				<module>spark/spark-1.6</module>
-				<module>spark/spark-common</module>
-			</modules>
-			<properties>
-				<spark.version>1.6.2</spark.version>
-				<spark.binary.version>1.6</spark.binary.version>
-			</properties>
-		</profile>
-		<profile>
 			<id>compile-xgboost</id>
 			<build>
 				<plugins>

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/bin/mvn-zinc
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/bin/mvn-zinc b/spark/spark-1.6/bin/mvn-zinc
deleted file mode 100755
index 759b0a5..0000000
--- a/spark/spark-1.6/bin/mvn-zinc
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Copyed from commit 48682f6bf663e54cb63b7e95a4520d34b6fa890b in Apache Spark
-
-# Determine the current working directory
-_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
-# Preserve the calling directory
-_CALLING_DIR="$(pwd)"
-# Options used during compilation
-_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
-
-# Installs any application tarball given a URL, the expected tarball name,
-# and, optionally, a checkable binary path to determine if the binary has
-# already been installed
-## Arg1 - URL
-## Arg2 - Tarball Name
-## Arg3 - Checkable Binary
-install_app() {
-  local remote_tarball="$1/$2"
-  local local_tarball="${_DIR}/$2"
-  local binary="${_DIR}/$3"
-  local curl_opts="--progress-bar -L"
-  local wget_opts="--progress=bar:force ${wget_opts}"
-
-  if [ -z "$3" -o ! -f "$binary" ]; then
-    # check if we already have the tarball
-    # check if we have curl installed
-    # download application
-    [ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \
-      echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \
-      curl ${curl_opts} "${remote_tarball}" > "${local_tarball}"
-    # if the file still doesn't exist, lets try `wget` and cross our fingers
-    [ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \
-      echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \
-      wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}"
-    # if both were unsuccessful, exit
-    [ ! -f "${local_tarball}" ] && \
-      echo -n "ERROR: Cannot download $2 with cURL or wget; " && \
-      echo "please install manually and try again." && \
-      exit 2
-    cd "${_DIR}" && tar -xzf "$2"
-    rm -rf "$local_tarball"
-  fi
-}
-
-# Install zinc under the bin/ folder
-install_zinc() {
-  local zinc_path="zinc-0.3.9/bin/zinc"
-  [ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
-  install_app \
-    "http://downloads.typesafe.com/zinc/0.3.9" \
-    "zinc-0.3.9.tgz" \
-    "${zinc_path}"
-  ZINC_BIN="${_DIR}/${zinc_path}"
-}
-
-# Setup healthy defaults for the Zinc port if none were provided from
-# the environment
-ZINC_PORT=${ZINC_PORT:-"3030"}
-
-# Install Zinc for the bin/
-install_zinc
-
-# Reset the current working directory
-cd "${_CALLING_DIR}"
-
-# Now that zinc is ensured to be installed, check its status and, if its
-# not running or just installed, start it
-if [ ! -f "${ZINC_BIN}" ]; then
-  exit -1
-fi
-if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
-  export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
-  "${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
-  "${ZINC_BIN}" -start -port ${ZINC_PORT} &>/dev/null
-fi
-
-# Set any `mvn` options if not already present
-export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
-
-# Last, call the `mvn` command as usual
-mvn -DzincPort=${ZINC_PORT} "$@"

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/extra-src/README
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/extra-src/README b/spark/spark-1.6/extra-src/README
deleted file mode 100644
index 8b5d0cd..0000000
--- a/spark/spark-1.6/extra-src/README
+++ /dev/null
@@ -1 +0,0 @@
-Copyed from spark master [commit 908e37bcc10132bb2aa7f80ae694a9df6e40f31a]

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
deleted file mode 100644
index 6da54f7..0000000
--- a/spark/spark-1.6/extra-src/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive.client
-
-import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
-import java.lang.reflect.{Method, Modifier}
-import java.net.URI
-import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Driver
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
-import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{IntegralType, StringType}
-
-/**
- * A shim that defines the interface between ClientWrapper and the underlying Hive library used to
- * talk to the metastore. Each Hive version has its own implementation of this class, defining
- * version-specific version of needed functions.
- *
- * The guideline for writing shims is:
- * - always extend from the previous version unless really not possible
- * - initialize methods in lazy vals, both for quicker access for multiple invocations, and to
- *   avoid runtime errors due to the above guideline.
- */
-private[client] sealed abstract class Shim {
-
-  /**
-   * Set the current SessionState to the given SessionState. Also, set the context classloader of
-   * the current thread to the one set in the HiveConf of this given `state`.
-   * @param state
-   */
-  def setCurrentSessionState(state: SessionState): Unit
-
-  /**
-   * This shim is necessary because the return type is different on different versions of Hive.
-   * All parameters are the same, though.
-   */
-  def getDataLocation(table: Table): Option[String]
-
-  def setDataLocation(table: Table, loc: String): Unit
-
-  def getAllPartitions(hive: Hive, table: Table): Seq[Partition]
-
-  def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition]
-
-  def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
-
-  def getDriverResults(driver: Driver): Seq[String]
-
-  def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
-
-  def loadPartition(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit
-
-  def loadTable(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      replace: Boolean,
-      holdDDLTime: Boolean): Unit
-
-  def loadDynamicPartitions(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit
-
-  def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
-
-  protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
-    val method = findMethod(klass, name, args: _*)
-    require(Modifier.isStatic(method.getModifiers()),
-      s"Method $name of class $klass is not static.")
-    method
-  }
-
-  protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
-    klass.getMethod(name, args: _*)
-  }
-
-}
-
-private[client] class Shim_v0_12 extends Shim with Logging {
-
-  private lazy val startMethod =
-    findStaticMethod(
-      classOf[SessionState],
-      "start",
-      classOf[SessionState])
-  private lazy val getDataLocationMethod = findMethod(classOf[Table], "getDataLocation")
-  private lazy val setDataLocationMethod =
-    findMethod(
-      classOf[Table],
-      "setDataLocation",
-      classOf[URI])
-  private lazy val getAllPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "getAllPartitionsForPruner",
-      classOf[Table])
-  private lazy val getCommandProcessorMethod =
-    findStaticMethod(
-      classOf[CommandProcessorFactory],
-      "get",
-      classOf[String],
-      classOf[HiveConf])
-  private lazy val getDriverResultsMethod =
-    findMethod(
-      classOf[Driver],
-      "getResults",
-      classOf[JArrayList[String]])
-  private lazy val loadPartitionMethod =
-    findMethod(
-      classOf[Hive],
-      "loadPartition",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadTableMethod =
-    findMethod(
-      classOf[Hive],
-      "loadTable",
-      classOf[Path],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadDynamicPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "loadDynamicPartitions",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JInteger.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val dropIndexMethod =
-    findMethod(
-      classOf[Hive],
-      "dropIndex",
-      classOf[String],
-      classOf[String],
-      classOf[String],
-      JBoolean.TYPE)
-
-  override def setCurrentSessionState(state: SessionState): Unit = {
-    // Starting from Hive 0.13, setCurrentSessionState will internally override
-    // the context class loader of the current thread by the class loader set in
-    // the conf of the SessionState. So, for this Hive 0.12 shim, we add the same
-    // behavior and make shim.setCurrentSessionState of all Hive versions have the
-    // consistent behavior.
-    Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader)
-    startMethod.invoke(null, state)
-  }
-
-  override def getDataLocation(table: Table): Option[String] =
-    Option(getDataLocationMethod.invoke(table)).map(_.toString())
-
-  override def setDataLocation(table: Table, loc: String): Unit =
-    setDataLocationMethod.invoke(table, new URI(loc))
-
-  override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
-    getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
-
-  override def getPartitionsByFilter(
-      hive: Hive,
-      table: Table,
-      predicates: Seq[Expression]): Seq[Partition] = {
-    // getPartitionsByFilter() doesn't support binary comparison ops in Hive 0.12.
-    // See HIVE-4888.
-    logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
-      "Please use Hive 0.13 or higher.")
-    getAllPartitions(hive, table)
-  }
-
-  override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
-    getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor]
-
-  override def getDriverResults(driver: Driver): Seq[String] = {
-    val res = new JArrayList[String]()
-    getDriverResultsMethod.invoke(driver, res)
-    res.asScala
-  }
-
-  override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
-    conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000
-  }
-
-  override def loadPartition(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit = {
-    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
-      holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)
-  }
-
-  override def loadTable(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      replace: Boolean,
-      holdDDLTime: Boolean): Unit = {
-    loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean)
-  }
-
-  override def loadDynamicPartitions(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit = {
-    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
-      numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean)
-  }
-
-  override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
-    dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
-  }
-
-}
-
-private[client] class Shim_v0_13 extends Shim_v0_12 {
-
-  private lazy val setCurrentSessionStateMethod =
-    findStaticMethod(
-      classOf[SessionState],
-      "setCurrentSessionState",
-      classOf[SessionState])
-  private lazy val setDataLocationMethod =
-    findMethod(
-      classOf[Table],
-      "setDataLocation",
-      classOf[Path])
-  private lazy val getAllPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "getAllPartitionsOf",
-      classOf[Table])
-  private lazy val getPartitionsByFilterMethod =
-    findMethod(
-      classOf[Hive],
-      "getPartitionsByFilter",
-      classOf[Table],
-      classOf[String])
-  private lazy val getCommandProcessorMethod =
-    findStaticMethod(
-      classOf[CommandProcessorFactory],
-      "get",
-      classOf[Array[String]],
-      classOf[HiveConf])
-  private lazy val getDriverResultsMethod =
-    findMethod(
-      classOf[Driver],
-      "getResults",
-      classOf[JList[Object]])
-
-  override def setCurrentSessionState(state: SessionState): Unit =
-    setCurrentSessionStateMethod.invoke(null, state)
-
-  override def setDataLocation(table: Table, loc: String): Unit =
-    setDataLocationMethod.invoke(table, new Path(loc))
-
-  override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
-    getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
-
-  /**
-   * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
-   * a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
-   *
-   * Unsupported predicates are skipped.
-   */
-  def convertFilters(table: Table, filters: Seq[Expression]): String = {
-    // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
-    val varcharKeys = table.getPartitionKeys.asScala
-      .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
-        col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
-      .map(col => col.getName).toSet
-
-    filters.collect {
-      case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
-        s"${a.name} ${op.symbol} $v"
-      case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
-        s"$v ${op.symbol} ${a.name}"
-      case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
-          if !varcharKeys.contains(a.name) =>
-        s"""${a.name} ${op.symbol} "$v""""
-      case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
-          if !varcharKeys.contains(a.name) =>
-        s""""$v" ${op.symbol} ${a.name}"""
-    }.mkString(" and ")
-  }
-
-  override def getPartitionsByFilter(
-      hive: Hive,
-      table: Table,
-      predicates: Seq[Expression]): Seq[Partition] = {
-
-    // Hive getPartitionsByFilter() takes a string that represents partition
-    // predicates like "str_key=\"value\" and int_key=1 ..."
-    val filter = convertFilters(table, predicates)
-    val partitions =
-      if (filter.isEmpty) {
-        getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
-      } else {
-        logDebug(s"Hive metastore filter is '$filter'.")
-        getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]]
-      }
-
-    partitions.asScala.toSeq
-  }
-
-  override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
-    getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor]
-
-  override def getDriverResults(driver: Driver): Seq[String] = {
-    val res = new JArrayList[Object]()
-    getDriverResultsMethod.invoke(driver, res)
-    res.asScala.map { r =>
-      r match {
-        case s: String => s
-        case a: Array[Object] => a(0).asInstanceOf[String]
-      }
-    }
-  }
-
-}
-
-private[client] class Shim_v0_14 extends Shim_v0_13 {
-
-  private lazy val loadPartitionMethod =
-    findMethod(
-      classOf[Hive],
-      "loadPartition",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadTableMethod =
-    findMethod(
-      classOf[Hive],
-      "loadTable",
-      classOf[Path],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val loadDynamicPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "loadDynamicPartitions",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JInteger.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-  private lazy val getTimeVarMethod =
-    findMethod(
-      classOf[HiveConf],
-      "getTimeVar",
-      classOf[HiveConf.ConfVars],
-      classOf[TimeUnit])
-
-  override def loadPartition(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit = {
-    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
-      holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
-      isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE)
-  }
-
-  override def loadTable(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      replace: Boolean,
-      holdDDLTime: Boolean): Unit = {
-    loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean,
-      isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, JBoolean.FALSE)
-  }
-
-  override def loadDynamicPartitions(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit = {
-    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
-      numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
-  }
-
-  override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
-    getTimeVarMethod.invoke(
-      conf,
-      HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
-      TimeUnit.MILLISECONDS).asInstanceOf[Long]
-  }
-
-  protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = {
-    val localFs = FileSystem.getLocal(conf)
-    val pathFs = FileSystem.get(path.toUri(), conf)
-    localFs.getUri() == pathFs.getUri()
-  }
-
-}
-
-private[client] class Shim_v1_0 extends Shim_v0_14 {
-
-}
-
-private[client] class Shim_v1_1 extends Shim_v1_0 {
-
-  private lazy val dropIndexMethod =
-    findMethod(
-      classOf[Hive],
-      "dropIndex",
-      classOf[String],
-      classOf[String],
-      classOf[String],
-      JBoolean.TYPE,
-      JBoolean.TYPE)
-
-  override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
-    dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean, true: JBoolean)
-  }
-
-}
-
-private[client] class Shim_v1_2 extends Shim_v1_1 {
-
-  private lazy val loadDynamicPartitionsMethod =
-    findMethod(
-      classOf[Hive],
-      "loadDynamicPartitions",
-      classOf[Path],
-      classOf[String],
-      classOf[JMap[String, String]],
-      JBoolean.TYPE,
-      JInteger.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JBoolean.TYPE,
-      JLong.TYPE)
-
-  override def loadDynamicPartitions(
-      hive: Hive,
-      loadPath: Path,
-      tableName: String,
-      partSpec: JMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit = {
-    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
-      numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE,
-      0L: JLong)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/pom.xml
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/pom.xml b/spark/spark-1.6/pom.xml
deleted file mode 100644
index 98bff6b..0000000
--- a/spark/spark-1.6/pom.xml
+++ /dev/null
@@ -1,261 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>io.github.myui</groupId>
-		<artifactId>hivemall</artifactId>
-		<version>0.4.2-rc.2</version>
-		<relativePath>../../pom.xml</relativePath>
-	</parent>
-
-	<artifactId>hivemall-spark</artifactId>
-	<name>Hivemall on Spark 1.6</name>
-	<packaging>jar</packaging>
-
-	<properties>
-		<PermGen>64m</PermGen>
-		<MaxPermGen>512m</MaxPermGen>
-		<CodeCacheSize>512m</CodeCacheSize>
-		<main.basedir>${project.parent.basedir}</main.basedir>
-	</properties>
-
-	<dependencies>
-		<!-- hivemall dependencies -->
-		<dependency>
-			<groupId>io.github.myui</groupId>
-			<artifactId>hivemall-core</artifactId>
-			<version>${project.version}</version>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>io.github.myui</groupId>
-			<artifactId>hivemall-spark-common</artifactId>
-			<version>${project.version}</version>
-			<scope>compile</scope>
-		</dependency>
-
-		<!-- third-party dependencies -->
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-library</artifactId>
-			<version>${scala.version}</version>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.commons</groupId>
-			<artifactId>commons-compress</artifactId>
-			<version>1.8</version>
-			<scope>compile</scope>
-		</dependency>
-
-		<!-- other provided dependencies -->
-		<dependency>
-			<groupId>org.apache.spark</groupId>
-			<artifactId>spark-core_${scala.binary.version}</artifactId>
-			<version>${spark.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.spark</groupId>
-			<artifactId>spark-sql_${scala.binary.version}</artifactId>
-			<version>${spark.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.spark</groupId>
-			<artifactId>spark-hive_${scala.binary.version}</artifactId>
-			<version>${spark.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.spark</groupId>
-			<artifactId>spark-streaming_${scala.binary.version}</artifactId>
-			<version>${spark.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.spark</groupId>
-			<artifactId>spark-mllib_${scala.binary.version}</artifactId>
-			<version>${spark.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<!-- test dependencies -->
-		<dependency>
-			<groupId>io.github.myui</groupId>
-			<artifactId>hivemall-mixserv</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.xerial</groupId>
-			<artifactId>xerial-core</artifactId>
-			<version>3.2.3</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.scalatest</groupId>
-			<artifactId>scalatest_${scala.binary.version}</artifactId>
-			<version>2.2.4</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<directory>target</directory>
-		<outputDirectory>target/classes</outputDirectory>
-		<finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}</finalName>
-		<testOutputDirectory>target/test-classes</testOutputDirectory>
-		<plugins>
-			<!-- For incremental compilation -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.2.2</version>
-				<executions>
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
-					<execution>
-						<id>scala-test-compile-first</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<scalaVersion>${scala.version}</scalaVersion>
-					<recompileMode>incremental</recompileMode>
-					<useZincServer>true</useZincServer>
-					<args>
-						<arg>-unchecked</arg>
-						<arg>-deprecation</arg>
-						<!-- TODO: To enable this option, we need to fix many wornings -->
-						<!-- <arg>-feature</arg> -->
-					</args>
-					<jvmArgs>
-						<jvmArg>-Xms1024m</jvmArg>
-						<jvmArg>-Xmx1024m</jvmArg>
-						<jvmArg>-XX:PermSize=${PermGen}</jvmArg>
-						<jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg>
-						<jvmArg>-XX:ReservedCodeCacheSize=${CodeCacheSize}</jvmArg>
-					</jvmArgs>
-				</configuration>
-			</plugin>
-			<!-- hivemall-spark_xx-xx.jar -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<version>2.5</version>
-				<configuration>
-					<finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}</finalName>
-					<outputDirectory>${project.parent.build.directory}</outputDirectory>
-				</configuration>
-			</plugin>
-			<!-- hivemall-spark_xx-xx-with-dependencies.jar including minimum dependencies -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<version>2.3</version>
-				<executions>
-					<execution>
-						<id>jar-with-dependencies</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<finalName>${project.artifactId}-${spark.binary.version}_${scala.binary.version}-${project.version}-with-dependencies</finalName>
-							<outputDirectory>${project.parent.build.directory}</outputDirectory>
-							<minimizeJar>false</minimizeJar>
-							<createDependencyReducedPom>false</createDependencyReducedPom>
-							<artifactSet>
-								<includes>
-									<include>io.github.myui:hivemall-core</include>
-									<include>io.github.myui:hivemall-spark-common</include>
-									<include>com.github.haifengl:smile-core</include>
-									<include>com.github.haifengl:smile-math</include>
-									<include>com.github.haifengl:smile-data</include>
-								</includes>
-							</artifactSet>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<!-- disable surefire because there is no java test -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<version>2.7</version>
-				<configuration>
-					<skipTests>true</skipTests>
-				</configuration>
-			</plugin>
-			<!-- then, enable scalatest -->
-			<plugin>
-				<groupId>org.scalatest</groupId>
-				<artifactId>scalatest-maven-plugin</artifactId>
-				<version>1.0</version>
-				<!-- Note config is repeated in surefire config -->
-				<configuration>
-					<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-					<junitxml>.</junitxml>
-					<filereports>SparkTestSuite.txt</filereports>
-					<argLine>-ea -Xmx2g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
-					<stderr/>
-					<environmentVariables>
-						<SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES>
-						<SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION>
-						<SPARK_TESTING>1</SPARK_TESTING>
-						<JAVA_HOME>${env.JAVA_HOME}</JAVA_HOME>
-					</environmentVariables>
-					<systemProperties>
-						<log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
-						<derby.system.durability>test</derby.system.durability>
-						<java.awt.headless>true</java.awt.headless>
-						<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
-						<spark.testing>1</spark.testing>
-						<spark.ui.enabled>false</spark.ui.enabled>
-						<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
-						<spark.unsafe.exceptionOnMemoryLeak>true</spark.unsafe.exceptionOnMemoryLeak>
-						<!-- Needed by sql/hive tests. -->
-						<test.src.tables>__not_used__</test.src.tables>
-					</systemProperties>
-					<tagsToExclude>${test.exclude.tags}</tagsToExclude>
-				</configuration>
-				<executions>
-					<execution>
-						<id>test</id>
-						<goals>
-							<goal>test</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/resources/log4j.properties b/spark/spark-1.6/src/main/resources/log4j.properties
deleted file mode 100644
index 72bf5b6..0000000
--- a/spark/spark-1.6/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,12 +0,0 @@
-# Set everything to be logged to the console
-log4j.rootCategory=INFO, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.eclipse.jetty=INFO
-log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
-log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
-log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala b/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala
deleted file mode 100644
index 01664f4..0000000
--- a/spark/spark-1.6/src/main/scala/hivemall/tools/RegressionDatagen.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package hivemall.tools
-
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
-import org.apache.spark.sql.types._
-
-object RegressionDatagen {
-
-  /**
-   * Generate data for regression/classification.
-   * See [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]]
-   * for the details of arguments below.
-   */
-  def exec(sc: SQLContext,
-           n_partitions: Int = 2,
-           min_examples: Int = 1000,
-           n_features: Int = 10,
-           n_dims: Int = 200,
-           seed: Int = 43,
-           dense: Boolean = false,
-           prob_one: Float = 0.6f,
-           sort: Boolean = false,
-           cl: Boolean = false): DataFrame = {
-
-    require(n_partitions > 0, "Non-negative #n_partitions required.")
-    require(min_examples > 0, "Non-negative #min_examples required.")
-    require(n_features > 0, "Non-negative #n_features required.")
-    require(n_dims > 0, "Non-negative #n_dims required.")
-
-    // Calculate #examples to generate in each partition
-    val n_examples = (min_examples + n_partitions - 1) / n_partitions
-
-    val df = sc.createDataFrame(
-        sc.sparkContext.parallelize((0 until n_partitions).map(Row(_)), n_partitions),
-        StructType(
-          StructField("data", IntegerType, true) ::
-          Nil)
-      )
-    import sc.implicits._
-    df.lr_datagen(
-      s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims -prob_one $prob_one"
-        + (if (dense) " -dense" else "")
-        + (if (sort) " -sort" else "")
-        + (if (cl) " -cl" else ""))
-      .select($"label".cast(DoubleType).as("label"), $"features")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
deleted file mode 100644
index 18ef9df..0000000
--- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.{AnalysisException, DataFrame, GroupedData}
-import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Cube, Pivot, Rollup}
-import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
-import org.apache.spark.sql.types._
-
-final class GroupedDataEx protected[sql](
-    df: DataFrame,
-    groupingExprs: Seq[Expression],
-    private val groupType: GroupedData.GroupType)
-  extends GroupedData(df, groupingExprs, groupType) {
-
-  //////////////////////////////////////////////////////////////////////////////////////////////
-  // toDF, alias, and strToExpr are copyed from the base class, GroupedData, because
-  // these methods have 'private[this]' modifiers.
-  //////////////////////////////////////////////////////////////////////////////////////////////
-
-  private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
-    val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
-      groupingExprs ++ aggExprs
-    } else {
-      aggExprs
-    }
-
-    val aliasedAgg = aggregates.map(alias)
-
-    groupType match {
-      case GroupedData.GroupByType =>
-        DataFrame(
-          df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
-      case GroupedData.RollupType =>
-        DataFrame(
-          df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aliasedAgg))
-      case GroupedData.CubeType =>
-        DataFrame(
-          df.sqlContext, Cube(groupingExprs, df.logicalPlan, aliasedAgg))
-      case GroupedData.PivotType(pivotCol, values) =>
-        val aliasedGrps = groupingExprs.map(alias)
-        DataFrame(
-          df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
-    }
-  }
-
-  private[this] def alias(expr: Expression): NamedExpression = expr match {
-    case u: UnresolvedAttribute => UnresolvedAlias(u)
-    case expr: NamedExpression => expr
-    case expr: Expression => Alias(expr, expr.prettyString)()
-  }
-
-  private[this] def strToExpr(expr: String): (Expression => Expression) = {
-    val exprToFunc: (Expression => Expression) = {
-      (inputExpr: Expression) => expr.toLowerCase match {
-        case "avg" | "average" | "mean" =>
-          UnresolvedFunction("avg", inputExpr :: Nil, isDistinct = false)
-        case "stddev" | "std" =>
-          UnresolvedFunction("stddev", inputExpr :: Nil, isDistinct = false)
-        // Also special handle count because we need to take care count(*).
-        case "count" | "size" =>
-          // Turn count(*) into count(1)
-          inputExpr match {
-            case s: Star => Count(Literal(1)).toAggregateExpression()
-            case _ => Count(inputExpr).toAggregateExpression()
-          }
-        case name => UnresolvedFunction(name, inputExpr :: Nil, isDistinct = false)
-      }
-    }
-    (inputExpr: Expression) => exprToFunc(inputExpr)
-  }
-
-  //////////////////////////////////////////////////////////////////////////////////////////////
-  //////////////////////////////////////////////////////////////////////////////////////////////
-
-  // `agg` only supports aggregation functions with a single argument
-  override def agg(exprs: Map[String, String]): DataFrame = {
-    toDF(exprs.map { case (colName, expr) =>
-      val a = expr match {
-        case "voted_avg" =>
-          HiveUDAFFunction(
-            new HiveFunctionWrapper("hivemall.ensemble.bagging.VotedAvgUDAF"),
-            Seq(df.col(colName).expr),
-            isUDAFBridgeRequired = true
-          ).toAggregateExpression
-        case "weight_voted_avg" =>
-          HiveUDAFFunction(
-            new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
-            Seq(df.col(colName).expr),
-            isUDAFBridgeRequired = true
-          ).toAggregateExpression()
-        case "rf_ensemble" =>
-          HiveUDAFFunction(
-            new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"),
-            Seq(df.col(colName).expr),
-            isUDAFBridgeRequired = true
-          ).toAggregateExpression()
-        case _ =>
-          strToExpr(expr)(df(colName).expr)
-      }
-      Alias(a, a.prettyString)()
-    }.toSeq)
-  }
-
-  private[this] def checkType(colName: String, expected: DataType) = {
-    val dataType = df.resolve(colName).dataType
-    if (dataType != expected) {
-      throw new AnalysisException(
-        s""""$colName" must be $expected, however it is $dataType""")
-    }
-  }
-
-  /**
-   * @see hivemall.ensemble.bagging.VotedAvgUDAF
-   */
-  def voted_avg(weight: String): DataFrame = {
-    // checkType(weight, NumericType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
-        Seq(weight).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ensemble.bagging.WeightVotedAvgUDAF
-   */
-  def weight_voted_avg(weight: String): DataFrame = {
-    // checkType(weight, NumericType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"),
-        Seq(weight).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ensemble.ArgminKLDistanceUDAF
-   */
-  def argmin_kld(weight: String, conv: String): DataFrame = {
-    // checkType(weight, NumericType)
-    // checkType(conv, NumericType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.ensemble.ArgminKLDistanceUDAF"),
-        Seq(weight, conv).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ensemble.MaxValueLabelUDAF
-   */
-  def max_label(score: String, label: String): DataFrame = {
-    // checkType(score, NumericType)
-    checkType(label, StringType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.ensemble.MaxValueLabelUDAF"),
-        Seq(score, label).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ensemble.MaxRowUDAF
-   */
-  def maxrow(score: String, label: String): DataFrame = {
-    // checkType(score, NumericType)
-    checkType(label, StringType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.ensemble.MaxRowUDAF"),
-        Seq(score, label).map(df.col(_).expr),
-        isUDAFBridgeRequired = false)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.smile.tools.RandomForestEnsembleUDAF
-   */
-  def rf_ensemble(predict: String): DataFrame = {
-    // checkType(predict, NumericType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"),
-        Seq(predict).map(df.col(_).expr),
-        isUDAFBridgeRequired = false)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.evaluation.MeanAbsoluteErrorUDAF
-   */
-  def mae(predict: String, target: String): DataFrame = {
-    checkType(predict, FloatType)
-    checkType(target, FloatType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.evaluation.MeanAbsoluteErrorUDAF"),
-        Seq(predict, target).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.evaluation.MeanSquareErrorUDAF
-   */
-  def mse(predict: String, target: String): DataFrame = {
-    checkType(predict, FloatType)
-    checkType(target, FloatType)
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.evaluation.MeanSquaredErrorUDAF"),
-        Seq(predict, target).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.evaluation.RootMeanSquareErrorUDAF
-   */
-  def rmse(predict: String, target: String): DataFrame = {
-    checkType(predict, FloatType)
-    checkType(target, FloatType)
-    val udaf = HiveUDAFFunction(
-      new HiveFunctionWrapper("hivemall.evaluation.RootMeanSquaredErrorUDAF"),
-        Seq(predict, target).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.evaluation.FMeasureUDAF
-   */
-  def f1score(predict: String, target: String): DataFrame = {
-    // checkType(target, ArrayType(IntegerType))
-    // checkType(predict, ArrayType(IntegerType))
-    val udaf = HiveUDAFFunction(
-        new HiveFunctionWrapper("hivemall.evaluation.FMeasureUDAF"),
-        Seq(predict, target).map(df.col(_).expr),
-        isUDAFBridgeRequired = true)
-      .toAggregateExpression()
-    toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq)
-  }
-
-  /**
-   * @see hivemall.ftvec.trans.OnehotEncodingUDAF
-   */
-  def onehot_encoding(features: String*): DataFrame = {
-    val udaf = HiveUDAFFunction(
-      new HiveFunctionWrapper("hivemall.ftvec.trans.OnehotEncodingUDAF"),
-      features.map(df.col(_).expr),
-      isUDAFBridgeRequired = false)
-    toDF(Seq(Alias(udaf, udaf.prettyString)()))
-  }
-
-  /**
-   * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF
-   */
-  def snr(X: String, Y: String): DataFrame = {
-    val udaf = HiveUDAFFunction(
-      new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"),
-      Seq(X, Y).map(df.col(_).expr),
-      isUDAFBridgeRequired = false)
-      .toAggregateExpression()
-    toDF(Seq(Alias(udaf, udaf.prettyString)()))
-  }
-
-  /**
-   * @see hivemall.tools.matrix.TransposeAndDotUDAF
-   */
-  def transpose_and_dot(X: String, Y: String): DataFrame = {
-    val udaf = HiveUDAFFunction(
-      new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"),
-      Seq(X, Y).map(df.col(_).expr),
-      isUDAFBridgeRequired = false)
-      .toAggregateExpression()
-    toDF(Seq(Alias(udaf, udaf.prettyString)()))
-  }
-}


[2/3] incubator-hivemall git commit: Close #67: [HIVEMALL-55][SPARK] Drop the Spark-v1.6 support

Posted by my...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
deleted file mode 100644
index 8583e1c..0000000
--- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
+++ /dev/null
@@ -1,1125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import java.util.UUID
-
-import org.apache.spark.Logging
-import org.apache.spark.ml.feature.HivemallFeature
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
-import org.apache.spark.sql.types._
-
-/**
- * Hivemall wrapper and some utility functions for DataFrame.
- *
- * @groupname regression
- * @groupname classifier
- * @groupname classifier.multiclass
- * @groupname ensemble
- * @groupname knn.similarity
- * @groupname knn.distance
- * @groupname knn.lsh
- * @groupname ftvec
- * @groupname ftvec.amplify
- * @groupname ftvec.hashing
- * @groupname ftvec.scaling
- * @groupname ftvec.conv
- * @groupname ftvec.trans
- * @groupname misc
- */
-final class HivemallOps(df: DataFrame) extends Logging {
-
-  /**
-   * An implicit conversion to avoid doing annoying transformation.
-   */
-  @inline
-  private[this] implicit def toDataFrame(logicalPlan: LogicalPlan) =
-    DataFrame(df.sqlContext, logicalPlan)
-
-  /**
-   * @see hivemall.regression.AdaDeltaUDTF
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_adadelta(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.AdaDeltaUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.regression.AdaGradUDTF
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_adagrad(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.AdaGradUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.regression.AROWRegressionUDTF
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_arow_regr(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.regression.AROWRegressionUDTF$AROWe
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_arowe_regr(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.regression.AROWRegressionUDTF$AROWe2
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_arowe2_regr(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe2"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.regression.LogressUDTF
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_logregr(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.LogressUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.regression.PassiveAggressiveRegressionUDTF
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_pa1_regr(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_pa1a_regr(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_pa2_regr(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_pa2a_regr(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.smile.regression.RandomForestRegressionUDTF
-   * @group regression
-   */
-  @scala.annotation.varargs
-  def train_randomforest_regr(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.smile.regression.RandomForestRegressionUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
-        .map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.PerceptronUDTF
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_perceptron(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.PerceptronUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.PassiveAggressiveUDTF
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_pa(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.PassiveAggressiveUDTF$PA1
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_pa1(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA1"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.PassiveAggressiveUDTF$PA2
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_pa2(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA2"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.ConfidenceWeightedUDTF
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_cw(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.ConfidenceWeightedUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.AROWClassifierUDTF
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_arow(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.AROWClassifierUDTF$AROWh
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_arowh(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF$AROWh"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_scw(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW1"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_scw2(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW2"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.classifier.AdaGradRDAUDTF
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_adagrad_rda(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.AdaGradRDAUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.smile.classification.RandomForestClassifierUDTF
-   * @group classifier
-   */
-  @scala.annotation.varargs
-  def train_randomforest_classifier(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.smile.classification.RandomForestClassifierUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests")
-        .map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.classifier.MulticlassPerceptronUDTF
-   * @group classifier.multiclass
-   */
-  @scala.annotation.varargs
-  def train_multiclass_perceptron(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPerceptronUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.classifier.PassiveAggressiveUDTF
-   * @group classifier.multiclass
-   */
-  @scala.annotation.varargs
-  def train_multiclass_pa(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA1
-   * @group classifier.multiclass
-   */
-  @scala.annotation.varargs
-  def train_multiclass_pa1(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper(
-          "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA2
-   * @group classifier.multiclass
-   */
-  @scala.annotation.varargs
-  def train_multiclass_pa2(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper(
-          "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.classifier.MulticlassConfidenceWeightedUDTF
-   * @group classifier.multiclass
-   */
-  @scala.annotation.varargs
-  def train_multiclass_cw(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.classifier.MulticlassAROWClassifierUDTF
-   * @group classifier.multiclass
-   */
-  @scala.annotation.varargs
-  def train_multiclass_arow(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW1
-   * @group classifier.multiclass
-   */
-  @scala.annotation.varargs
-  def train_multiclass_scw(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper(
-          "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW2
-   * @group classifier.multiclass
-   */
-  @scala.annotation.varargs
-  def train_multiclass_scw2(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper(
-          "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2"),
-        setMixServs(exprs: _*).map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
-   * See [[GroupedDataEx]] for all the available aggregate functions.
-   *
-   * TODO: This class bypasses the original GroupData
-   * so as to support user-defined aggregations.
-   * Need a more smart injection into existing DataFrame APIs.
-   *
-   * A list of added Hivemall UDAF:
-   *  - voted_avg
-   *  - weight_voted_avg
-   *  - argmin_kld
-   *  - max_label
-   *  - maxrow
-   *  - f1score
-   *  - mae
-   *  - mse
-   *  - rmse
-   *
-   * @groupname ensemble
-   */
-  @scala.annotation.varargs
-  def groupby(cols: Column*): GroupedDataEx = {
-    new GroupedDataEx(df, cols.map(_.expr), GroupedData.GroupByType)
-  }
-
-  @scala.annotation.varargs
-  def groupby(col1: String, cols: String*): GroupedDataEx = {
-    val colNames: Seq[String] = col1 +: cols
-    new GroupedDataEx(df, colNames.map(colName => df(colName).expr), GroupedData.GroupByType)
-  }
-
-  /**
-   * @see hivemall.knn.lsh.MinHashUDTF
-   * @group knn.lsh
-   */
-  @scala.annotation.varargs
-  def minhash(exprs: Column*): DataFrame = {
-     Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.knn.lsh.MinHashUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      Seq("clusterid", "item").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.ftvec.amplify.AmplifierUDTF
-   * @group ftvec.amplify
-   */
-  @scala.annotation.varargs
-  def amplify(exprs: Column*): DataFrame = {
-    val outputAttr = exprs.drop(1).map {
-      case Column(expr: NamedExpression) => UnresolvedAttribute(expr.name)
-      case Column(expr: Expression) => UnresolvedAttribute(expr.prettyString)
-    }
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.ftvec.amplify.AmplifierUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      outputAttr,
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.ftvec.amplify.RandomAmplifierUDTF
-   * @group ftvec.amplify
-   */
-  @scala.annotation.varargs
-  def rand_amplify(exprs: Column*): DataFrame = {
-    val outputAttr = exprs.drop(2).map {
-      case Column(expr: NamedExpression) => UnresolvedAttribute(expr.name)
-      case Column(expr: Expression) => UnresolvedAttribute(expr.prettyString)
-    }
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.ftvec.amplify.RandomAmplifierUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      outputAttr,
-      df.logicalPlan)
-  }
-
-  /**
-   * Amplifies and shuffle data inside partitions.
-   * @group ftvec.amplify
-   */
-  def part_amplify(xtimes: Int): DataFrame = {
-    val rdd = df.rdd.mapPartitions({ iter =>
-      val elems = iter.flatMap{ row =>
-        Seq.fill[Row](xtimes)(row)
-      }
-      // Need to check how this shuffling affects results
-      scala.util.Random.shuffle(elems)
-    }, true)
-    df.sqlContext.createDataFrame(rdd, df.schema)
-  }
-
-  /**
-   * Quantifies input columns.
-   * @see hivemall.ftvec.conv.QuantifyColumnsUDTF
-   * @group ftvec.conv
-   */
-  @scala.annotation.varargs
-  def quantify(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.ftvec.conv.QuantifyColumnsUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.ftvec.trans.BinarizeLabelUDTF
-   * @group ftvec.trans
-   */
-  @scala.annotation.varargs
-  def binarize_label(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.ftvec.trans.BinarizeLabelUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * @see hivemall.ftvec.trans.QuantifiedFeaturesUDTF
-   * @group ftvec.trans
-   */
-  @scala.annotation.varargs
-  def quantified_features(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.ftvec.trans.QuantifiedFeaturesUDTF"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      Seq("features").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * Splits Seq[String] into pieces.
-   * @group ftvec
-   */
-  def explode_array(expr: Column): DataFrame = {
-    df.explode(expr) { case Row(v: Seq[_]) =>
-      // Type erasure removes the component type in Seq
-      v.map(s => HivemallFeature(s.asInstanceOf[String]))
-    }
-  }
-
-  def explode_array(expr: String): DataFrame =
-    this.explode_array(df(expr))
-
-  /**
-   * Returns a top-`k` records for each `group`.
-   * @group misc
-   */
-  def each_top_k(k: Column, group: Column, value: Column, args: Column*): DataFrame = {
-    val clusterDf = df.repartition(group).sortWithinPartitions(group)
-    Generate(HiveGenericUDTF(
-      new HiveFunctionWrapper("hivemall.tools.EachTopKUDTF"),
-      (Seq(k, group, value) ++ args).map(_.expr)),
-    join = false, outer = false, None,
-    (Seq("rank", "key") ++ args.map(_.named.name)).map(UnresolvedAttribute(_)),
-    clusterDf.logicalPlan)
-  }
-
-  /**
-   * Returns a new [[DataFrame]] with columns renamed.
-   * This is a wrapper for DataFrame#toDF.
-   * @group misc
-   */
-  @scala.annotation.varargs
-  def as(colNames: String*): DataFrame = df.toDF(colNames: _*)
-
-  /**
-   * Returns all the columns as Seq[Column] in this [[DataFrame]].
-   * @group misc
-   */
-  def cols: Seq[Column] = {
-    df.schema.fields.map(col => df.col(col.name)).toSeq
-  }
-
-  /**
-   * @see hivemall.dataset.LogisticRegressionDataGeneratorUDTF
-   * @group misc
-   */
-  @scala.annotation.varargs
-  def lr_datagen(exprs: Column*): DataFrame = {
-    Generate(HiveGenericUDTF(
-        new HiveFunctionWrapper("hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper"),
-        exprs.map(_.expr)),
-      join = false, outer = false, None,
-      Seq("label", "features").map(UnresolvedAttribute(_)),
-      df.logicalPlan)
-  }
-
-  /**
-   * :: Experimental ::
-   * If a parameter '-mix' does not exist in a 3rd argument,
-   * set it from an environmental variable
-   * 'HIVEMALL_MIX_SERVERS'.
-   *
-   * TODO: This could work if '--deploy-mode' has 'client';
-   * otherwise, we need to set HIVEMALL_MIX_SERVERS
-   * in all possible spark workers.
-   */
-  private[this] def setMixServs(exprs: Column*): Seq[Column] = {
-    val mixes = System.getenv("HIVEMALL_MIX_SERVERS")
-    if (mixes != null && !mixes.isEmpty()) {
-      val groupId = df.sqlContext.sparkContext.applicationId + "-" + UUID.randomUUID
-      logInfo(s"set '${mixes}' as default mix servers (session: ${groupId})")
-      exprs.size match {
-        case 2 =>
-          exprs :+ Column(Literal.create(s"-mix ${mixes} -mix_session ${groupId}", StringType))
-        /** TODO: Add codes in the case where exprs.size == 3. */
-        case _ =>
-          exprs
-      }
-    } else {
-      exprs
-    }
-  }
-}
-
-object HivemallOps {
-
-  /**
-   * Implicitly inject the [[HivemallOps]] into [[DataFrame]].
-   */
-  implicit def dataFrameToHivemallOps(df: DataFrame): HivemallOps =
-    new HivemallOps(df)
-
-  /**
-   * An implicit conversion to avoid doing annoying transformation.
-   */
-  @inline private implicit def toColumn(expr: Expression) = Column(expr)
-
-  /**
-   * @see hivemall.HivemallVersionUDF
-   * @group misc
-   */
-  def hivemall_version(): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.HivemallVersionUDF"), Nil)
-  }
-
-  /**
-   * @see hivemall.knn.similarity.CosineSimilarityUDF
-   * @group knn.similarity
-   */
-  @scala.annotation.varargs
-  def cosine_sim(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.similarity.CosineSimilarityUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.similarity.JaccardIndexUDF
-   * @group knn.similarity
-   */
-  @scala.annotation.varargs
-  def jaccard(exprs: Column*): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.knn.similarity.JaccardIndexUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.similarity.AngularSimilarityUDF
-   * @group knn.similarity
-   */
-  @scala.annotation.varargs
-  def angular_similarity(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.similarity.AngularSimilarityUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.similarity.EuclidSimilarity
-   * @group knn.similarity
-   */
-  @scala.annotation.varargs
-  def euclid_similarity(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.similarity.EuclidSimilarity"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.similarity.Distance2SimilarityUDF
-   * @group knn.similarity
-   */
-  @scala.annotation.varargs
-  def distance2similarity(exprs: Column*): Column = {
-    // TODO: Need a wrapper class because of using unsupported types
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.similarity.Distance2SimilarityUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.distance.HammingDistanceUDF
-   * @group knn.distance
-   */
-  @scala.annotation.varargs
-  def hamming_distance(exprs: Column*): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.knn.distance.HammingDistanceUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.distance.PopcountUDF
-   * @group knn.distance
-   */
-  @scala.annotation.varargs
-  def popcnt(exprs: Column*): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.knn.distance.PopcountUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.distance.KLDivergenceUDF
-   * @group knn.distance
-   */
-  @scala.annotation.varargs
-  def kld(exprs: Column*): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.knn.distance.KLDivergenceUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.distance.EuclidDistanceUDF
-   * @group knn.distance
-   */
-  @scala.annotation.varargs
-  def euclid_distance(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.distance.EuclidDistanceUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.distance.CosineDistanceUDF
-   * @group knn.distance
-   */
-  @scala.annotation.varargs
-  def cosine_distance(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.distance.CosineDistanceUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.distance.AngularDistanceUDF
-   * @group knn.distance
-   */
-  @scala.annotation.varargs
-  def angular_distance(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.distance.AngularDistanceUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.distance.ManhattanDistanceUDF
-   * @group knn.distance
-   */
-  @scala.annotation.varargs
-  def manhattan_distance(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.distance.ManhattanDistanceUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.distance.MinkowskiDistanceUDF
-   * @group knn.distance
-   */
-  @scala.annotation.varargs
-  def minkowski_distance (exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.distance.MinkowskiDistanceUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.lsh.bBitMinHashUDF
-   * @group knn.lsh
-   */
-  @scala.annotation.varargs
-  def bbit_minhash(exprs: Column*): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.knn.lsh.bBitMinHashUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.knn.lsh.MinHashesUDF
-   * @group knn.lsh
-   */
-  @scala.annotation.varargs
-  def minhashes(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.knn.lsh.MinHashesUDFWrapper"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.AddBiasUDF
-   * @group ftvec
-   */
-  @scala.annotation.varargs
-  def add_bias(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.AddBiasUDFWrapper"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.ExtractFeatureUdf
-   * @group ftvec
-   *
-   * TODO: This throws java.lang.ClassCastException because
-   * HiveInspectors.toInspector has a bug in spark.
-   * Need to fix it later.
-   */
-  def extract_feature(expr: Column): Column = {
-    val hiveUdf = HiveGenericUDF(
-      new HiveFunctionWrapper("hivemall.ftvec.ExtractFeatureUDFWrapper"),
-      expr.expr :: Nil)
-    Column(hiveUdf).as("feature")
-  }
-
-  /**
-   * @see hivemall.ftvec.ExtractWeightUdf
-   * @group ftvec
-   *
-   * TODO: This throws java.lang.ClassCastException because
-   * HiveInspectors.toInspector has a bug in spark.
-   * Need to fix it later.
-   */
-  def extract_weight(expr: Column): Column = {
-    val hiveUdf = HiveGenericUDF(
-      new HiveFunctionWrapper("hivemall.ftvec.ExtractWeightUDFWrapper"),
-      expr.expr :: Nil)
-    Column(hiveUdf).as("value")
-  }
-
-  /**
-   * @see hivemall.ftvec.AddFeatureIndexUDFWrapper
-   * @group ftvec
-   */
-  def add_feature_index(expr: Column): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.AddFeatureIndexUDFWrapper"), expr.expr :: Nil)
-  }
-
-  /**
-   * @see hivemall.ftvec.SortByFeatureUDF
-   * @group ftvec
-   */
-  def sort_by_feature(expr: Column): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.SortByFeatureUDFWrapper"), expr.expr :: Nil)
-  }
-
-  /**
-   * @see hivemall.ftvec.hashing.MurmurHash3UDF
-   * @group ftvec.hashing
-   */
-  def mhash(expr: Column): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.hashing.MurmurHash3UDF"), expr.expr :: Nil)
-  }
-
-  /**
-   * @see hivemall.ftvec.hashing.Sha1UDF
-   * @group ftvec.hashing
-   */
-  def sha1(expr: Column): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.hashing.Sha1UDF"), expr.expr :: Nil)
-  }
-
-  /**
-   * @see hivemall.ftvec.hashing.ArrayHashValuesUDF
-   * @group ftvec.hashing
-   */
-  @scala.annotation.varargs
-  def array_hash_values(exprs: Column*): Column = {
-    // TODO: Need a wrapper class because of using unsupported types
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.hashing.ArrayHashValuesUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF
-   * @group ftvec.hashing
-   */
-  @scala.annotation.varargs
-  def prefixed_hash_values(exprs: Column*): Column = {
-    // TODO: Need a wrapper class because of using unsupported types
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.scaling.RescaleUDF
-   * @group ftvec.scaling
-   */
-  @scala.annotation.varargs
-  def rescale(exprs: Column*): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.scaling.RescaleUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.scaling.ZScoreUDF
-   * @group ftvec.scaling
-   */
-  @scala.annotation.varargs
-  def zscore(exprs: Column*): Column = {
-    HiveSimpleUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.scaling.ZScoreUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.scaling.L2NormalizationUDF
-   * @group ftvec.scaling
-   */
-  def normalize(expr: Column): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.scaling.L2NormalizationUDFWrapper"), expr.expr :: Nil)
-  }
-
-  /**
-   * @see hivemall.ftvec.selection.ChiSquareUDF
-   * @group ftvec.selection
-   */
-  def chi2(observed: Column, expected: Column): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.selection.ChiSquareUDF"), Seq(observed.expr, expected.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.conv.ToDenseFeaturesUDF
-   * @group ftvec.conv
-   */
-  @scala.annotation.varargs
-  def to_dense_features(exprs: Column*): Column = {
-    // TODO: Need a wrapper class because of using unsupported types
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.conv.ToDenseFeaturesUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.conv.ToSparseFeaturesUDF
-   * @group ftvec.conv
-   */
-  @scala.annotation.varargs
-  def to_sparse_features(exprs: Column*): Column = {
-    // TODO: Need a wrapper class because of using unsupported types
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.conv.ToSparseFeaturesUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.trans.VectorizeFeaturesUDF
-   * @group ftvec.trans
-   */
-  @scala.annotation.varargs
-  def vectorize_features(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.trans.VectorizeFeaturesUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.trans.CategoricalFeaturesUDF
-   * @group ftvec.trans
-   */
-  @scala.annotation.varargs
-  def categorical_features(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.trans.CategoricalFeaturesUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.trans.IndexedFeatures
-   * @group ftvec.trans
-   */
-  @scala.annotation.varargs
-  def indexed_features(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.trans.IndexedFeatures"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.ftvec.trans.QuantitativeFeaturesUDF
-   * @group ftvec.trans
-   */
-  @scala.annotation.varargs
-  def quantitative_features(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.ftvec.trans.QuantitativeFeaturesUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.smile.tools.TreePredictUDF
-   * @group misc
-   */
-  @scala.annotation.varargs
-  def tree_predict(exprs: Column*): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.smile.tools.TreePredictUDF"), exprs.map(_.expr))
-  }
-
-  /**
-   * @see hivemall.tools.array.SelectKBestUDF
-   * @group tools.array
-   */
-  def select_k_best(X: Column, importanceList: Column, k: Column): Column = {
-    HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.tools.array.SelectKBestUDF"), Seq(X.expr, importanceList.expr, k.expr))
-  }
-
-  /**
-   * @see hivemall.tools.math.SigmoidUDF
-   * @group misc
-   */
-  @scala.annotation.varargs
-  def sigmoid(exprs: Column*): Column = {
-    /**
-     * TODO: SigmodUDF only accepts floating-point types in spark-v1.5.0?
-     */
-    val value = exprs.head
-    val one: () => Literal = () => Literal.create(1.0, DoubleType)
-    Column(one()) / (Column(one()) + exp(-value))
-  }
-
-  /**
-   * @see hivemall.tools.mapred.RowIdUDF
-   * @group misc
-   */
-  def rowid(): Column = {
-    val hiveUdf = HiveGenericUDF(new HiveFunctionWrapper(
-      "hivemall.tools.mapred.RowIdUDFWrapper"), Nil)
-    hiveUdf.as("rowid")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
deleted file mode 100644
index dff62b3..0000000
--- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
-import org.apache.spark.sql.{Column, DataFrame, Row, UserDefinedFunction}
-import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types._
-
-object HivemallUtils {
-
-  // # of maximum dimensions for feature vectors
-  val maxDims = 100000000
-
-  /**
-   * An implicit conversion to avoid doing annoying transformation.
-   * This class must be in o.a.spark.sql._ because
-   * a Column class is private.
-   */
-  @inline implicit def toBooleanLiteral(i: Boolean): Column = Column(Literal.create(i, BooleanType))
-  @inline implicit def toIntLiteral(i: Int): Column = Column(Literal.create(i, IntegerType))
-  @inline implicit def toFloatLiteral(i: Float): Column = Column(Literal.create(i, FloatType))
-  @inline implicit def toDoubleLiteral(i: Double): Column = Column(Literal.create(i, DoubleType))
-  @inline implicit def toStringLiteral(i: String): Column = Column(Literal.create(i, StringType))
-  @inline implicit def toIntArrayLiteral(i: Seq[Int]): Column =
-    Column(Literal.create(i, ArrayType(IntegerType)))
-  @inline implicit def toStringArrayLiteral(i: Seq[String]): Column =
-    Column(Literal.create(i, ArrayType(StringType)))
-
-  /**
-   * Check whether the given schema contains a column of the required data type.
-   * @param colName  column name
-   * @param dataType  required column data type
-   */
-  def checkColumnType(schema: StructType, colName: String, dataType: DataType): Unit = {
-    val actualDataType = schema(colName).dataType
-    require(actualDataType.equals(dataType),
-      s"Column $colName must be of type $dataType but was actually $actualDataType.")
-  }
-
-  /**
-   * Make up a function object from a Hivemall model.
-   */
-  def funcModel(df: DataFrame, dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = {
-    checkColumnType(df.schema, "feature", StringType)
-    checkColumnType(df.schema, "weight", DoubleType)
-
-    import df.sqlContext.implicits._
-    val intercept = df
-      .where($"feature" === "0")
-      .select($"weight")
-      .map { case Row(weight: Double) => weight}
-      .reduce(_ + _)
-    val weights = funcVectorizerImpl(dense, dims)(
-      df.select($"feature", $"weight")
-        .where($"feature" !== "0")
-        .map { case Row(label: String, feature: Double) => s"${label}:$feature"}
-        .collect.toSeq)
-
-    udf((input: Vector) => BLAS.dot(input, weights) + intercept)
-  }
-
-  /**
-   * Make up a function object to transform Hivemall features into Vector.
-   */
-  def funcVectorizer(dense: Boolean = false, dims: Int = maxDims)
-    : UserDefinedFunction = {
-    udf(funcVectorizerImpl(dense, dims))
-  }
-
-  private def funcVectorizerImpl(dense: Boolean, dims: Int)
-    : Seq[String] => Vector = {
-    if (dense) {
-      // Dense features
-      i: Seq[String] => {
-        val features = new Array[Double](dims)
-        i.map { ft =>
-          val s = ft.split(":").ensuring(_.size == 2)
-          features(s(0).toInt) = s(1).toDouble
-        }
-        Vectors.dense(features)
-      }
-    } else {
-      // Sparse features
-      i: Seq[String] => {
-        val features = i.map { ft =>
-          // val s = ft.split(":").ensuring(_.size == 2)
-          val s = ft.split(":")
-          (s(0).toInt, s(1).toDouble)
-        }
-        Vectors.sparse(dims, features)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/resources/log4j.properties b/spark/spark-1.6/src/test/resources/log4j.properties
deleted file mode 100644
index 1db11f0..0000000
--- a/spark/spark-1.6/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-# Set everything to be logged to the console
-log4j.rootCategory=FATAL, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
-

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/hivemall/mix/server/MixServerSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/hivemall/mix/server/MixServerSuite.scala b/spark/spark-1.6/src/test/scala/hivemall/mix/server/MixServerSuite.scala
deleted file mode 100644
index dbb818b..0000000
--- a/spark/spark-1.6/src/test/scala/hivemall/mix/server/MixServerSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package hivemall.mix.server
-
-import java.util.Random
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
-import java.util.logging.Logger
-
-import hivemall.mix.MixMessage.MixEventName
-import hivemall.mix.client.MixClient
-import hivemall.mix.server.MixServer.ServerState
-import hivemall.model.{DenseModel, PredictionModel, WeightValue}
-import hivemall.utils.io.IOUtils
-import hivemall.utils.lang.CommandLineUtils
-import hivemall.utils.net.NetUtils
-import org.scalatest.{BeforeAndAfter, FunSuite}
-
-class MixServerSuite extends FunSuite with BeforeAndAfter {
-
-  private[this] var server: MixServer = _
-  private[this] var executor : ExecutorService = _
-  private[this] var port: Int = _
-
-  private[this] val rand = new Random(43)
-  private[this] val counter = Stream.from(0).iterator
-
-  private[this] val eachTestTime = 100
-  private[this] val logger =
-    Logger.getLogger(classOf[MixServerSuite].getName)
-
-  before {
-    this.port = NetUtils.getAvailablePort
-    this.server = new MixServer(
-      CommandLineUtils.parseOptions(
-        Array("-port", s"${port}", "-sync_threshold", "3"),
-        MixServer.getOptions()
-      )
-    )
-    this.executor = Executors.newSingleThreadExecutor
-    this.executor.submit(server)
-    var retry = 0
-    while (server.getState() != ServerState.RUNNING && retry < 50) {
-      Thread.sleep(1000L)
-      retry += 1
-    }
-    assert(server.getState == ServerState.RUNNING)
-  }
-
-  after { this.executor.shutdown() }
-
-  private[this] def clientDriver(
-      groupId: String, model: PredictionModel, numMsg: Int = 1000000): Unit = {
-    var client: MixClient = null
-    try {
-      client = new MixClient(MixEventName.average, groupId, s"localhost:${port}", false, 2, model)
-      model.configureMix(client, false)
-      model.configureClock()
-
-      for (_ <- 0 until numMsg) {
-        val feature = Integer.valueOf(rand.nextInt(model.size))
-        model.set(feature, new WeightValue(1.0f))
-      }
-
-      while (true) { Thread.sleep(eachTestTime * 1000 + 100L) }
-      assert(model.getNumMixed > 0)
-    } finally {
-      IOUtils.closeQuietly(client)
-    }
-  }
-
-  private[this] def fixedGroup: (String, () => String) =
-    ("fixed", () => "fixed")
-  private[this] def uniqueGroup: (String, () => String) =
-    ("unique", () => s"${counter.next}")
-
-  Seq(65536).map { ndims =>
-    Seq(4).map { nclient =>
-      Seq(fixedGroup, uniqueGroup).map { id =>
-        val testName = s"dense-dim:${ndims}-clinet:${nclient}-${id._1}"
-        ignore(testName) {
-          val clients = Executors.newCachedThreadPool()
-          val numClients = nclient
-          val models = (0 until numClients).map(i => new DenseModel(ndims, false))
-          (0 until numClients).map { i =>
-            clients.submit(new Runnable() {
-              override def run(): Unit = {
-                try {
-                  clientDriver(
-                    s"${testName}-${id._2}",
-                    models(i)
-                  )
-                } catch {
-                  case e: InterruptedException =>
-                    assert(false, e.getMessage)
-                }
-              }
-            })
-          }
-          clients.awaitTermination(eachTestTime, TimeUnit.SECONDS)
-          clients.shutdown()
-          val nMixes = models.map(d => d.getNumMixed).reduce(_ + _)
-          logger.info(s"${testName} --> ${(nMixes + 0.0) / eachTestTime} mixes/s")
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala b/spark/spark-1.6/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala
deleted file mode 100644
index f203fc2..0000000
--- a/spark/spark-1.6/src/test/scala/hivemall/tools/RegressionDatagenSuite.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package hivemall.tools
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.sql.hive.test.TestHive
-
-class RegressionDatagenSuite extends FunSuite {
-
-  test("datagen") {
-   val df = RegressionDatagen.exec(
-     TestHive, min_examples = 10000, n_features = 100, n_dims = 65536, dense = false, cl = true)
-    assert(df.count() >= 10000)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/SparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/SparkFunSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/SparkFunSuite.scala
deleted file mode 100644
index 991e46f..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark
-
-// scalastyle:off
-import org.scalatest.{FunSuite, Outcome}
-
-/**
- * Base abstract class for all unit tests in Spark for handling common functionality.
- */
-private[spark] abstract class SparkFunSuite extends FunSuite with Logging {
-// scalastyle:on
-
-  /**
-   * Log the suite name and the test name before and after each test.
-   *
-   * Subclasses should never override this method. If they wish to run
-   * custom code before and after each test, they should mix in the
-   * {{org.scalatest.BeforeAndAfter}} trait instead.
-   */
-  final protected override def withFixture(test: NoArgTest): Outcome = {
-    val testName = test.text
-    val suiteName = this.getClass.getName
-    val shortSuiteName = suiteName.replaceAll("org.apache.spark", "o.a.s")
-    try {
-      logInfo(s"\n\n===== TEST OUTPUT FOR $shortSuiteName: '$testName' =====\n")
-      test()
-    } finally {
-      logInfo(s"\n\n===== FINISHED $shortSuiteName: '$testName' =====\n")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala
deleted file mode 100644
index f57983f..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/ml/feature/HivemallLabeledPointSuite.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.ml.feature
-
-import org.apache.spark.SparkFunSuite
-
-class HivemallLabeledPointSuite extends SparkFunSuite {
-
-  test("toString") {
-    val lp = HivemallLabeledPoint(1.0f, Seq("1:0.5", "3:0.3", "8:0.1"))
-    assert(lp.toString === "1.0,[1:0.5,3:0.3,8:0.1]")
-  }
-
-  test("parse") {
-    val lp = HivemallLabeledPoint.parse("1.0,[1:0.5,3:0.3,8:0.1]")
-    assert(lp.label === 1.0)
-    assert(lp.features === Seq("1:0.5", "3:0.3", "8:0.1"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/QueryTest.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/QueryTest.scala
deleted file mode 100644
index ef520ae..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql
-
-import java.util.{Locale, TimeZone}
-
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.TreeNode
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.{LogicalRDD, Queryable}
-import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
-abstract class QueryTest extends PlanTest {
-
-  protected def sqlContext: SQLContext
-
-  // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*)
-  TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
-  // Add Locale setting
-  Locale.setDefault(Locale.US)
-
-  /**
-   * Runs the plan and makes sure the answer contains all of the keywords, or the
-   * none of keywords are listed in the answer
-   * @param df the [[DataFrame]] to be executed
-   * @param exists true for make sure the keywords are listed in the output, otherwise
-   *               to make sure none of the keyword are not listed in the output
-   * @param keywords keyword in string array
-   */
-  def checkExistence(df: DataFrame, exists: Boolean, keywords: String*) {
-    val outputs = df.collect().map(_.mkString).mkString
-    for (key <- keywords) {
-      if (exists) {
-        assert(outputs.contains(key), s"Failed for $df ($key doesn't exist in result)")
-      } else {
-        assert(!outputs.contains(key), s"Failed for $df ($key existed in the result)")
-      }
-    }
-  }
-
-  /**
-   * Evaluates a dataset to make sure that the result of calling collect matches the given
-   * expected answer.
-   *  - Special handling is done based on whether the query plan should be expected to return
-   *    the results in sorted order.
-   *  - This function also checks to make sure that the schema for serializing the expected answer
-   *    matches that produced by the dataset (i.e. does manual construction of object match
-   *    the constructed encoder for cases like joins, etc).  Note that this means that it will fail
-   *    for cases where reordering is done on fields.  For such tests, user `checkDecoding` instead
-   *    which performs a subset of the checks done by this function.
-   */
-  protected def checkAnswer[T](
-      ds: Dataset[T],
-      expectedAnswer: T*): Unit = {
-    checkAnswer(
-      ds.toDF(),
-      sqlContext.createDataset(expectedAnswer)(ds.unresolvedTEncoder).toDF().collect().toSeq)
-
-    checkDecoding(ds, expectedAnswer: _*)
-  }
-
-  protected def checkDecoding[T](
-      ds: => Dataset[T],
-      expectedAnswer: T*): Unit = {
-    val decoded = try ds.collect().toSet catch {
-      case e: Exception =>
-        fail(
-          s"""
-             |Exception collecting dataset as objects
-             |${ds.resolvedTEncoder}
-             |${ds.resolvedTEncoder.fromRowExpression.treeString}
-             |${ds.queryExecution}
-           """.stripMargin, e)
-    }
-
-    if (decoded != expectedAnswer.toSet) {
-      val expected = expectedAnswer.toSet.toSeq.map((a: Any) => a.toString).sorted
-      val actual = decoded.toSet.toSeq.map((a: Any) => a.toString).sorted
-
-      val comparision = sideBySide("expected" +: expected, "spark" +: actual).mkString("\n")
-      fail(
-        s"""Decoded objects do not match expected objects:
-            |$comparision
-            |${ds.resolvedTEncoder.fromRowExpression.treeString}
-         """.stripMargin)
-    }
-  }
-
-  /**
-   * Runs the plan and makes sure the answer matches the expected result.
-   * @param df the [[DataFrame]] to be executed
-   * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
-   */
-  protected def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
-    val analyzedDF = try df catch {
-      case ae: AnalysisException =>
-        val currentValue = sqlContext.conf.dataFrameEagerAnalysis
-        sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
-        val partiallyAnalzyedPlan = df.queryExecution.analyzed
-        sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, currentValue)
-        fail(
-          s"""
-             |Failed to analyze query: $ae
-             |$partiallyAnalzyedPlan
-             |
-             |${stackTraceToString(ae)}
-             |""".stripMargin)
-    }
-
-    assertEmptyMissingInput(df)
-
-    QueryTest.checkAnswer(analyzedDF, expectedAnswer) match {
-      case Some(errorMessage) => fail(errorMessage)
-      case None =>
-    }
-  }
-
-  protected def checkAnswer(df: => DataFrame, expectedAnswer: Row): Unit = {
-    checkAnswer(df, Seq(expectedAnswer))
-  }
-
-  protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = {
-    checkAnswer(df, expectedAnswer.collect())
-  }
-
-  /**
-   * Runs the plan and makes sure the answer is within absTol of the expected result.
-   * @param dataFrame the [[DataFrame]] to be executed
-   * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
-   * @param absTol the absolute tolerance between actual and expected answers.
-   */
-  protected def checkAggregatesWithTol(dataFrame: DataFrame,
-      expectedAnswer: Seq[Row],
-      absTol: Double): Unit = {
-    // TODO: catch exceptions in data frame execution
-    val actualAnswer = dataFrame.collect()
-    require(actualAnswer.length == expectedAnswer.length,
-      s"actual num rows ${actualAnswer.length} != expected num of rows ${expectedAnswer.length}")
-
-    actualAnswer.zip(expectedAnswer).foreach {
-      case (actualRow, expectedRow) =>
-        QueryTest.checkAggregatesWithTol(actualRow, expectedRow, absTol)
-    }
-  }
-
-  protected def checkAggregatesWithTol(dataFrame: DataFrame,
-      expectedAnswer: Row,
-      absTol: Double): Unit = {
-    checkAggregatesWithTol(dataFrame, Seq(expectedAnswer), absTol)
-  }
-
-  /**
-   * Asserts that a given [[Queryable]] will be executed using the given number of cached results.
-   */
-  def assertCached(query: Queryable, numCachedTables: Int = 1): Unit = {
-    val planWithCaching = query.queryExecution.withCachedData
-    val cachedData = planWithCaching collect {
-      case cached: InMemoryRelation => cached
-    }
-
-    assert(
-      cachedData.size == numCachedTables,
-      s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
-        planWithCaching)
-  }
-
-  /**
-   * Asserts that a given [[Queryable]] does not have missing inputs in all the analyzed plans.
-   */
-  def assertEmptyMissingInput(query: Queryable): Unit = {
-    assert(query.queryExecution.analyzed.missingInput.isEmpty,
-      s"The analyzed logical plan has missing inputs: ${query.queryExecution.analyzed}")
-    assert(query.queryExecution.optimizedPlan.missingInput.isEmpty,
-      s"The optimized logical plan has missing inputs: ${query.queryExecution.optimizedPlan}")
-    assert(query.queryExecution.executedPlan.missingInput.isEmpty,
-      s"The physical plan has missing inputs: ${query.queryExecution.executedPlan}")
-  }
-}
-
-object QueryTest {
-  /**
-   * Runs the plan and makes sure the answer matches the expected result.
-   * If there was exception during the execution or the contents of the DataFrame does not
-   * match the expected result, an error message will be returned. Otherwise, a [[None]] will
-   * be returned.
-   * @param df the [[DataFrame]] to be executed
-   * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
-   */
-  def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
-    val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
-
-    // We need to call prepareRow recursively to handle schemas with struct types.
-    def prepareRow(row: Row): Row = {
-      Row.fromSeq(row.toSeq.map {
-        case null => null
-        case d: java.math.BigDecimal => BigDecimal(d)
-        // Convert array to Seq for easy equality check.
-        case b: Array[_] => b.toSeq
-        case r: Row => prepareRow(r)
-        case o => o
-      })
-    }
-
-    def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
-      // Converts data to types that we can do equality comparison using Scala collections.
-      // For BigDecimal type, the Scala type has a better definition of equality test (similar to
-      // Java's java.math.BigDecimal.compareTo).
-      // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for
-      // equality test.
-      val converted: Seq[Row] = answer.map(prepareRow)
-      if (!isSorted) converted.sortBy(_.toString()) else converted
-    }
-    val sparkAnswer = try df.collect().toSeq catch {
-      case e: Exception =>
-        val errorMessage =
-          s"""
-            |Exception thrown while executing query:
-            |${df.queryExecution}
-            |== Exception ==
-            |$e
-            |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
-          """.stripMargin
-        return Some(errorMessage)
-    }
-
-    if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
-      val errorMessage =
-        s"""
-        |Results do not match for query:
-        |${df.queryExecution}
-        |== Results ==
-        |${sideBySide(
-          s"== Correct Answer - ${expectedAnswer.size} ==" +:
-            prepareAnswer(expectedAnswer).map(_.toString()),
-          s"== Spark Answer - ${sparkAnswer.size} ==" +:
-            prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")}
-      """.stripMargin
-      return Some(errorMessage)
-    }
-
-    return None
-  }
-
-  /**
-   * Runs the plan and makes sure the answer is within absTol of the expected result.
-   * @param actualAnswer the actual result in a [[Row]].
-   * @param expectedAnswer the expected result in a[[Row]].
-   * @param absTol the absolute tolerance between actual and expected answers.
-   */
-  protected def checkAggregatesWithTol(actualAnswer: Row, expectedAnswer: Row, absTol: Double) = {
-    require(actualAnswer.length == expectedAnswer.length,
-      s"actual answer length ${actualAnswer.length} != " +
-        s"expected answer length ${expectedAnswer.length}")
-
-    // TODO: support other numeric types besides Double
-    // TODO: support struct types?
-    actualAnswer.toSeq.zip(expectedAnswer.toSeq).foreach {
-      case (actual: Double, expected: Double) =>
-        assert(math.abs(actual - expected) < absTol,
-          s"actual answer $actual not within $absTol of correct answer $expected")
-      case (actual, expected) =>
-        assert(actual == expected, s"$actual did not equal $expected")
-    }
-  }
-
-  def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): String = {
-    checkAnswer(df, expectedAnswer.asScala) match {
-      case Some(errorMessage) => errorMessage
-      case None => null
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
deleted file mode 100644
index 816576e..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.catalyst.plans
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.catalyst.util._
-
-/**
- * Provides helper methods for comparing plans.
- */
-class PlanTest extends SparkFunSuite {
-
-  /**
-   * Since attribute references are given globally unique ids during analysis,
-   * we must normalize them to check if two different queries are identical.
-   */
-  protected def normalizeExprIds(plan: LogicalPlan) = {
-    plan transformAllExpressions {
-      case a: AttributeReference =>
-        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
-      case a: Alias =>
-        Alias(a.child, a.name)(exprId = ExprId(0))
-    }
-  }
-
-  /** Fails the test if the two plans do not match */
-  protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
-    val normalized1 = normalizeExprIds(plan1)
-    val normalized2 = normalizeExprIds(plan2)
-    if (normalized1 != normalized2) {
-      fail(
-        s"""
-          |== FAIL: Plans do not match ===
-          |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
-         """.stripMargin)
-    }
-  }
-
-  /** Fails the test if the two expressions do not match */
-  protected def compareExpressions(e1: Expression, e2: Expression): Unit = {
-    comparePlans(Filter(e1, OneRowRelation), Filter(e2, OneRowRelation))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/c53b9ff9/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
----------------------------------------------------------------------
diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
deleted file mode 100644
index ded94ba..0000000
--- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.Row
-import org.apache.spark.test.HivemallQueryTest
-
-final class HiveUdfSuite extends HivemallQueryTest {
-
-  import hiveContext.implicits._
-  import hiveContext._
-
-  test("hivemall_version") {
-    sql(s"""
-         | CREATE TEMPORARY FUNCTION hivemall_version
-         |   AS '${classOf[hivemall.HivemallVersionUDF].getName}'
-       """.stripMargin)
-
-    checkAnswer(
-      sql(s"SELECT DISTINCT hivemall_version()"),
-      Row("0.4.2-rc.2")
-    )
-
-    // sql("DROP TEMPORARY FUNCTION IF EXISTS hivemall_version")
-    // reset()
-  }
-
-  test("train_logregr") {
-    TinyTrainData.registerTempTable("TinyTrainData")
-    sql(s"""
-         | CREATE TEMPORARY FUNCTION train_logregr
-         |   AS '${classOf[hivemall.regression.LogressUDTF].getName}'
-       """.stripMargin)
-    sql(s"""
-         | CREATE TEMPORARY FUNCTION add_bias
-         |   AS '${classOf[hivemall.ftvec.AddBiasUDFWrapper].getName}'
-       """.stripMargin)
-
-    val model = sql(
-      s"""
-         | SELECT feature, AVG(weight) AS weight
-         |   FROM (
-         |       SELECT train_logregr(add_bias(features), label) AS (feature, weight)
-         |         FROM TinyTrainData
-         |     ) t
-         |   GROUP BY feature
-       """.stripMargin)
-
-    checkAnswer(
-      model.select($"feature"),
-      Seq(Row("0"), Row("1"), Row("2"))
-    )
-
-    // TODO: Why 'train_logregr' is not registered in HiveMetaStore?
-    // ERROR RetryingHMSHandler: MetaException(message:NoSuchObjectException
-    //   (message:Function default.train_logregr does not exist))
-    //
-    // hiveContext.sql("DROP TEMPORARY FUNCTION IF EXISTS train_logregr")
-    // hiveContext.reset()
-  }
-
-  test("each_top_k") {
-    val testDf = Seq(
-      ("a", "1", 0.5, Array(0, 1, 2)),
-      ("b", "5", 0.1, Array(3)),
-      ("a", "3", 0.8, Array(2, 5)),
-      ("c", "6", 0.3, Array(1, 3)),
-      ("b", "4", 0.3, Array(2)),
-      ("a", "2", 0.6, Array(1))
-    ).toDF("key", "value", "score", "data")
-
-    import testDf.sqlContext.implicits._
-    testDf.repartition($"key").sortWithinPartitions($"key").registerTempTable("TestData")
-    sql(s"""
-         | CREATE TEMPORARY FUNCTION each_top_k
-         |   AS '${classOf[hivemall.tools.EachTopKUDTF].getName}'
-       """.stripMargin)
-
-    // Compute top-1 rows for each group
-    assert(
-      sql("SELECT each_top_k(1, key, score, key, value) FROM TestData").collect.toSet ===
-      Set(
-        Row(1, 0.8, "a", "3"),
-        Row(1, 0.3, "b", "4"),
-        Row(1, 0.3, "c", "6")
-      ))
-
-    // Compute reverse top-1 rows for each group
-    assert(
-      sql("SELECT each_top_k(-1, key, score, key, value) FROM TestData").collect.toSet ===
-      Set(
-        Row(1, 0.5, "a", "1"),
-        Row(1, 0.1, "b", "5"),
-        Row(1, 0.3, "c", "6")
-      ))
-  }
-}