You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/06/19 19:12:07 UTC
spark git commit: [SPARK-8207] [SQL] Add math function bin
Repository: spark
Updated Branches:
refs/heads/master 43c7ec638 -> 2c59d5c12
[SPARK-8207] [SQL] Add math function bin
JIRA: https://issues.apache.org/jira/browse/SPARK-8207
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #6721 from viirya/expr_bin and squashes the following commits:
07e1c8f [Liang-Chi Hsieh] Remove AbstractUnaryMathExpression and let BIN inherit UnaryExpression.
0677f1a [Liang-Chi Hsieh] For comments.
cf62b95 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
0cf20f2 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
dea9c12 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
d4f4774 [Liang-Chi Hsieh] Add @ignore_unicode_prefix.
7a0196f [Liang-Chi Hsieh] Fix python style.
ac2bacd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
a0a2d0f [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
4cb764d [Liang-Chi Hsieh] For comments.
0f78682 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
c0c3197 [Liang-Chi Hsieh] Add bin to FunctionRegistry.
824f761 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
50e0c3b [Liang-Chi Hsieh] Add math function bin(a: long): string.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c59d5c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c59d5c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c59d5c1
Branch: refs/heads/master
Commit: 2c59d5c12a0a02702839bfaf631505b8a311c5a9
Parents: 43c7ec6
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Fri Jun 19 10:09:31 2015 -0700
Committer: Davies Liu <da...@databricks.com>
Committed: Fri Jun 19 10:09:31 2015 -0700
----------------------------------------------------------------------
python/pyspark/sql/functions.py | 14 ++++++++
.../catalyst/analysis/FunctionRegistry.scala | 1 +
.../spark/sql/catalyst/expressions/math.scala | 33 ++++++++++++++++++-
.../expressions/MathFunctionsSuite.scala | 34 ++++++++++++++++----
.../scala/org/apache/spark/sql/functions.scala | 18 +++++++++++
.../spark/sql/DataFrameFunctionsSuite.scala | 10 ++++++
6 files changed, 102 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index acdb01d..cfa87ae 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -35,6 +35,7 @@ from pyspark.sql.column import Column, _to_java_column, _to_seq
__all__ = [
'array',
'approxCountDistinct',
+ 'bin',
'coalesce',
'countDistinct',
'explode',
@@ -231,6 +232,19 @@ def approxCountDistinct(col, rsd=None):
return Column(jc)
+@ignore_unicode_prefix
+@since(1.5)
+def bin(col):
+ """Returns the string representation of the binary value of the given column.
+
+ >>> df.select(bin(df.age).alias('c')).collect()
+ [Row(c=u'10'), Row(c=u'101')]
+ """
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.bin(_to_java_column(col))
+ return Column(jc)
+
+
@since(1.4)
def coalesce(*cols):
"""Returns the first column that is not null.
http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 13b2bb0..79273a7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -103,6 +103,7 @@ object FunctionRegistry {
expression[Asin]("asin"),
expression[Atan]("atan"),
expression[Atan2]("atan2"),
+ expression[Bin]("bin"),
expression[Cbrt]("cbrt"),
expression[Ceil]("ceil"),
expression[Ceil]("ceiling"),
http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
index f79bf4a..250564d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql.catalyst.expressions
+import java.lang.{Long => JLong}
+
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.types.{DataType, DoubleType}
+import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
/**
* A leaf expression specifically for math constants. Math constants expect no input.
@@ -207,6 +210,34 @@ case class ToRadians(child: Expression) extends UnaryMathExpression(math.toRadia
override def funcName: String = "toRadians"
}
+case class Bin(child: Expression)
+ extends UnaryExpression with Serializable with ExpectsInputTypes {
+
+ val name: String = "BIN"
+
+ override def foldable: Boolean = child.foldable
+ override def nullable: Boolean = true
+ override def toString: String = s"$name($child)"
+
+ override def expectedChildTypes: Seq[DataType] = Seq(LongType)
+ override def dataType: DataType = StringType
+
+ def funcName: String = name.toLowerCase
+
+ override def eval(input: catalyst.InternalRow): Any = {
+ val evalE = child.eval(input)
+ if (evalE == null) {
+ null
+ } else {
+ UTF8String.fromString(JLong.toBinaryString(evalE.asInstanceOf[Long]))
+ }
+ }
+
+ override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ defineCodeGen(ctx, ev, (c) =>
+ s"${ctx.stringType}.fromString(java.lang.Long.toBinaryString($c))")
+ }
+}
////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index 21e9b92..0d1d5eb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.types.DoubleType
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.types.{DataType, DoubleType, LongType}
class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -41,16 +42,18 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
* Used for testing unary math expressions.
*
* @param c expression
- * @param f The functions in scala.math
+ * @param f The functions in scala.math or elsewhere used to generate expected results
* @param domain The set of values to run the function with
* @param expectNull Whether the given values should return null or not
* @tparam T Generic type for primitives
+ * @tparam U Generic type for the output of the given function `f`
*/
- private def testUnary[T](
+ private def testUnary[T, U](
c: Expression => Expression,
- f: T => T,
+ f: T => U,
domain: Iterable[T] = (-20 to 20).map(_ * 0.1),
- expectNull: Boolean = false): Unit = {
+ expectNull: Boolean = false,
+ evalType: DataType = DoubleType): Unit = {
if (expectNull) {
domain.foreach { value =>
checkEvaluation(c(Literal(value)), null, EmptyRow)
@@ -60,7 +63,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(c(Literal(value)), f(value), EmptyRow)
}
}
- checkEvaluation(c(Literal.create(null, DoubleType)), null, create_row(null))
+ checkEvaluation(c(Literal.create(null, evalType)), null, create_row(null))
}
/**
@@ -168,7 +171,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("signum") {
- testUnary[Double](Signum, math.signum)
+ testUnary[Double, Double](Signum, math.signum)
}
test("log") {
@@ -186,6 +189,23 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
testUnary(Log1p, math.log1p, (-10 to -2).map(_ * 1.0), expectNull = true)
}
+ test("bin") {
+ testUnary(Bin, java.lang.Long.toBinaryString, (-20 to 20).map(_.toLong), evalType = LongType)
+
+ val row = create_row(null, 12L, 123L, 1234L, -123L)
+ val l1 = 'a.long.at(0)
+ val l2 = 'a.long.at(1)
+ val l3 = 'a.long.at(2)
+ val l4 = 'a.long.at(3)
+ val l5 = 'a.long.at(4)
+
+ checkEvaluation(Bin(l1), null, row)
+ checkEvaluation(Bin(l2), java.lang.Long.toBinaryString(12), row)
+ checkEvaluation(Bin(l3), java.lang.Long.toBinaryString(123), row)
+ checkEvaluation(Bin(l4), java.lang.Long.toBinaryString(1234), row)
+ checkEvaluation(Bin(l5), java.lang.Long.toBinaryString(-123), row)
+ }
+
test("log2") {
def f: (Double) => Double = (x: Double) => math.log(x) / math.log(2)
testUnary(Log2, f, (0 to 20).map(_ * 0.1))
http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index d8a91be..40ae9f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -890,6 +890,24 @@ object functions {
def atan2(l: Double, rightName: String): Column = atan2(l, Column(rightName))
/**
+ * An expression that returns the string representation of the binary value of the given long
+ * column. For example, bin("12") returns "1100".
+ *
+ * @group math_funcs
+ * @since 1.5.0
+ */
+ def bin(e: Column): Column = Bin(e.expr)
+
+ /**
+ * An expression that returns the string representation of the binary value of the given long
+ * column. For example, bin("12") returns "1100".
+ *
+ * @group math_funcs
+ * @since 1.5.0
+ */
+ def bin(columnName: String): Column = bin(Column(columnName))
+
+ /**
* Computes the cube-root of the given value.
*
* @group math_funcs
http://git-wip-us.apache.org/repos/asf/spark/blob/2c59d5c1/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index cfd2386..70819fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -110,6 +110,16 @@ class DataFrameFunctionsSuite extends QueryTest {
testData2.collect().toSeq.map(r => Row(~r.getInt(0))))
}
+ test("bin") {
+ val df = Seq[(Integer, Integer)]((12, null)).toDF("a", "b")
+ checkAnswer(
+ df.select(bin("a"), bin("b")),
+ Row("1100", null))
+ checkAnswer(
+ df.selectExpr("bin(a)", "bin(b)"),
+ Row("1100", null))
+ }
+
test("if function") {
val df = Seq((1, 2)).toDF("a", "b")
checkAnswer(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org