You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/06/19 19:12:07 UTC

spark git commit: [SPARK-8207] [SQL] Add math function bin

Repository: spark
Updated Branches:
  refs/heads/master 43c7ec638 -> 2c59d5c12


[SPARK-8207] [SQL] Add math function bin

JIRA: https://issues.apache.org/jira/browse/SPARK-8207

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #6721 from viirya/expr_bin and squashes the following commits:

07e1c8f [Liang-Chi Hsieh] Remove AbstractUnaryMathExpression and let BIN inherit UnaryExpression.
0677f1a [Liang-Chi Hsieh] For comments.
cf62b95 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
0cf20f2 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
dea9c12 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
d4f4774 [Liang-Chi Hsieh] Add @ignore_unicode_prefix.
7a0196f [Liang-Chi Hsieh] Fix python style.
ac2bacd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
a0a2d0f [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
4cb764d [Liang-Chi Hsieh] For comments.
0f78682 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
c0c3197 [Liang-Chi Hsieh] Add bin to FunctionRegistry.
824f761 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
50e0c3b [Liang-Chi Hsieh] Add math function bin(a: long): string.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c59d5c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c59d5c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c59d5c1

Branch: refs/heads/master
Commit: 2c59d5c12a0a02702839bfaf631505b8a311c5a9
Parents: 43c7ec6
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Fri Jun 19 10:09:31 2015 -0700
Committer: Davies Liu <da...@databricks.com>
Committed: Fri Jun 19 10:09:31 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 | 14 ++++++++
 .../catalyst/analysis/FunctionRegistry.scala    |  1 +
 .../spark/sql/catalyst/expressions/math.scala   | 33 ++++++++++++++++++-
 .../expressions/MathFunctionsSuite.scala        | 34 ++++++++++++++++----
 .../scala/org/apache/spark/sql/functions.scala  | 18 +++++++++++
 .../spark/sql/DataFrameFunctionsSuite.scala     | 10 ++++++
 6 files changed, 102 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index acdb01d..cfa87ae 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -35,6 +35,7 @@ from pyspark.sql.column import Column, _to_java_column, _to_seq
 __all__ = [
     'array',
     'approxCountDistinct',
+    'bin',
     'coalesce',
     'countDistinct',
     'explode',
@@ -231,6 +232,19 @@ def approxCountDistinct(col, rsd=None):
     return Column(jc)
 
 
+@ignore_unicode_prefix
+@since(1.5)
+def bin(col):
+    """Returns the string representation of the binary value of the given column.
+
+    >>> df.select(bin(df.age).alias('c')).collect()
+    [Row(c=u'10'), Row(c=u'101')]
+    """
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.bin(_to_java_column(col))
+    return Column(jc)
+
+
 @since(1.4)
 def coalesce(*cols):
     """Returns the first column that is not null.

http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 13b2bb0..79273a7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -103,6 +103,7 @@ object FunctionRegistry {
     expression[Asin]("asin"),
     expression[Atan]("atan"),
     expression[Atan2]("atan2"),
+    expression[Bin]("bin"),
     expression[Cbrt]("cbrt"),
     expression[Ceil]("ceil"),
     expression[Ceil]("ceiling"),

http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
index f79bf4a..250564d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import java.lang.{Long => JLong}
+
 import org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.types.{DataType, DoubleType}
+import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * A leaf expression specifically for math constants. Math constants expect no input.
@@ -207,6 +210,34 @@ case class ToRadians(child: Expression) extends UnaryMathExpression(math.toRadia
   override def funcName: String = "toRadians"
 }
 
+case class Bin(child: Expression)
+  extends UnaryExpression with Serializable with ExpectsInputTypes {
+
+  val name: String = "BIN"
+
+  override def foldable: Boolean = child.foldable
+  override def nullable: Boolean = true
+  override def toString: String = s"$name($child)"
+
+  override def expectedChildTypes: Seq[DataType] = Seq(LongType)
+  override def dataType: DataType = StringType
+
+  def funcName: String = name.toLowerCase
+
+  override def eval(input: catalyst.InternalRow): Any = {
+    val evalE = child.eval(input)
+    if (evalE == null) {
+      null
+    } else {
+      UTF8String.fromString(JLong.toBinaryString(evalE.asInstanceOf[Long]))
+    }
+  }
+
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    defineCodeGen(ctx, ev, (c) =>
+      s"${ctx.stringType}.fromString(java.lang.Long.toBinaryString($c))")
+  }
+}
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index 21e9b92..0d1d5eb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.types.DoubleType
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.types.{DataType, DoubleType, LongType}
 
 class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
 
@@ -41,16 +42,18 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
    * Used for testing unary math expressions.
    *
    * @param c expression
-   * @param f The functions in scala.math
+   * @param f The functions in scala.math or elsewhere used to generate expected results
    * @param domain The set of values to run the function with
    * @param expectNull Whether the given values should return null or not
    * @tparam T Generic type for primitives
+   * @tparam U Generic type for the output of the given function `f`
    */
-  private def testUnary[T](
+  private def testUnary[T, U](
       c: Expression => Expression,
-      f: T => T,
+      f: T => U,
       domain: Iterable[T] = (-20 to 20).map(_ * 0.1),
-      expectNull: Boolean = false): Unit = {
+      expectNull: Boolean = false,
+      evalType: DataType = DoubleType): Unit = {
     if (expectNull) {
       domain.foreach { value =>
         checkEvaluation(c(Literal(value)), null, EmptyRow)
@@ -60,7 +63,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
         checkEvaluation(c(Literal(value)), f(value), EmptyRow)
       }
     }
-    checkEvaluation(c(Literal.create(null, DoubleType)), null, create_row(null))
+    checkEvaluation(c(Literal.create(null, evalType)), null, create_row(null))
   }
 
   /**
@@ -168,7 +171,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("signum") {
-    testUnary[Double](Signum, math.signum)
+    testUnary[Double, Double](Signum, math.signum)
   }
 
   test("log") {
@@ -186,6 +189,23 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     testUnary(Log1p, math.log1p, (-10 to -2).map(_ * 1.0), expectNull = true)
   }
 
+  test("bin") {
+    testUnary(Bin, java.lang.Long.toBinaryString, (-20 to 20).map(_.toLong), evalType = LongType)
+
+    val row = create_row(null, 12L, 123L, 1234L, -123L)
+    val l1 = 'a.long.at(0)
+    val l2 = 'a.long.at(1)
+    val l3 = 'a.long.at(2)
+    val l4 = 'a.long.at(3)
+    val l5 = 'a.long.at(4)
+
+    checkEvaluation(Bin(l1), null, row)
+    checkEvaluation(Bin(l2), java.lang.Long.toBinaryString(12), row)
+    checkEvaluation(Bin(l3), java.lang.Long.toBinaryString(123), row)
+    checkEvaluation(Bin(l4), java.lang.Long.toBinaryString(1234), row)
+    checkEvaluation(Bin(l5), java.lang.Long.toBinaryString(-123), row)
+  }
+
   test("log2") {
     def f: (Double) => Double = (x: Double) => math.log(x) / math.log(2)
     testUnary(Log2, f, (0 to 20).map(_ * 0.1))

http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index d8a91be..40ae9f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -890,6 +890,24 @@ object functions {
   def atan2(l: Double, rightName: String): Column = atan2(l, Column(rightName))
 
   /**
+   * An expression that returns the string representation of the binary value of the given long
+   * column. For example, bin("12") returns "1100".
+   *
+   * @group math_funcs
+   * @since 1.5.0
+   */
+  def bin(e: Column): Column = Bin(e.expr)
+
+  /**
+   * An expression that returns the string representation of the binary value of the given long
+   * column. For example, bin("12") returns "1100".
+   *
+   * @group math_funcs
+   * @since 1.5.0
+   */
+  def bin(columnName: String): Column = bin(Column(columnName))
+
+  /**
    * Computes the cube-root of the given value.
    *
    * @group math_funcs

http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index cfd2386..70819fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -110,6 +110,16 @@ class DataFrameFunctionsSuite extends QueryTest {
       testData2.collect().toSeq.map(r => Row(~r.getInt(0))))
   }
 
+  test("bin") {
+    val df = Seq[(Integer, Integer)]((12, null)).toDF("a", "b")
+    checkAnswer(
+      df.select(bin("a"), bin("b")),
+      Row("1100", null))
+    checkAnswer(
+      df.selectExpr("bin(a)", "bin(b)"),
+      Row("1100", null))
+  }
+
   test("if function") {
     val df = Seq((1, 2)).toDF("a", "b")
     checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org