You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/06/30 18:50:14 UTC

spark git commit: [SPARK-8236] [SQL] misc functions: crc32

Repository: spark
Updated Branches:
  refs/heads/master a48e61915 -> 722aa5f48


[SPARK-8236] [SQL] misc functions: crc32

https://issues.apache.org/jira/browse/SPARK-8236

Author: Shilei <sh...@intel.com>

Closes #7108 from qiansl127/Crc32 and squashes the following commits:

5477352 [Shilei] Change to AutoCastInputTypes
5f16e5d [Shilei] Add misc function crc32


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/722aa5f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/722aa5f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/722aa5f4

Branch: refs/heads/master
Commit: 722aa5f48ec105bf23eee2361adddfe3a0cd6fc4
Parents: a48e619
Author: Shilei <sh...@intel.com>
Authored: Tue Jun 30 09:49:58 2015 -0700
Committer: Davies Liu <da...@databricks.com>
Committed: Tue Jun 30 09:49:58 2015 -0700

----------------------------------------------------------------------
 .../catalyst/analysis/FunctionRegistry.scala    |  1 +
 .../spark/sql/catalyst/expressions/misc.scala   | 40 ++++++++++++++++++++
 .../expressions/MiscFunctionsSuite.scala        |  8 ++++
 .../scala/org/apache/spark/sql/functions.scala  | 16 ++++++++
 .../spark/sql/DataFrameFunctionsSuite.scala     | 11 ++++++
 5 files changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/722aa5f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index b17457d..d53eaed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -139,6 +139,7 @@ object FunctionRegistry {
     expression[Sha2]("sha2"),
     expression[Sha1]("sha1"),
     expression[Sha1]("sha"),
+    expression[Crc32]("crc32"),
 
     // aggregate functions
     expression[Average]("avg"),

http://git-wip-us.apache.org/repos/asf/spark/blob/722aa5f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 27805bf..a7bcbe4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.security.MessageDigest
 import java.security.NoSuchAlgorithmException
+import java.util.zip.CRC32
 
 import org.apache.commons.codec.digest.DigestUtils
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
@@ -168,3 +169,42 @@ case class Sha1(child: Expression) extends UnaryExpression with AutoCastInputTyp
     )
   }
 }
+
+/**
+ * A function that computes a cyclic redundancy check value and returns it as a bigint
+ * For input of type [[BinaryType]]
+ */
+case class Crc32(child: Expression)
+  extends UnaryExpression with AutoCastInputTypes {
+
+  override def dataType: DataType = LongType
+
+  override def expectedChildTypes: Seq[DataType] = Seq(BinaryType)
+
+  override def eval(input: InternalRow): Any = {
+    val value = child.eval(input)
+    if (value == null) {
+      null
+    } else {
+      val checksum = new CRC32
+      checksum.update(value.asInstanceOf[Array[Byte]], 0, value.asInstanceOf[Array[Byte]].length)
+      checksum.getValue
+    }
+  }
+
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val value = child.gen(ctx)
+    val CRC32 = "java.util.zip.CRC32"
+    s"""
+      ${value.code}
+      boolean ${ev.isNull} = ${value.isNull};
+      long ${ev.primitive} = ${ctx.defaultValue(dataType)};
+      if (!${ev.isNull}) {
+        ${CRC32} checksum = new ${CRC32}();
+        checksum.update(${value.primitive}, 0, ${value.primitive}.length);
+        ${ev.primitive} = checksum.getValue();
+      }
+    """
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/722aa5f4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
index 36e636b..b524d0a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
@@ -49,4 +49,12 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     checkEvaluation(Sha2(Literal("ABC".getBytes), Literal.create(null, IntegerType)), null)
     checkEvaluation(Sha2(Literal.create(null, BinaryType), Literal.create(null, IntegerType)), null)
   }
+
+  test("crc32") {
+    checkEvaluation(Crc32(Literal("ABC".getBytes)), 2743272264L)
+    checkEvaluation(Crc32(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType)),
+      2180413220L)
+    checkEvaluation(Crc32(Literal.create(null, BinaryType)), null)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/722aa5f4/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 4d9a019..6331fe6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1466,6 +1466,22 @@ object functions {
    */
   def sha2(columnName: String, numBits: Int): Column = sha2(Column(columnName), numBits)
 
+  /**
+   * Calculates the cyclic redundancy check value and returns the value as a bigint.
+   *
+   * @group misc_funcs
+   * @since 1.5.0
+   */
+  def crc32(e: Column): Column = Crc32(e.expr)
+
+  /**
+   * Calculates the cyclic redundancy check value and returns the value as a bigint.
+   *
+   * @group misc_funcs
+   * @since 1.5.0
+   */
+  def crc32(columnName: String): Column = crc32(Column(columnName))
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // String functions
   //////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/722aa5f4/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index abfd47c..11a8767 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -173,6 +173,17 @@ class DataFrameFunctionsSuite extends QueryTest {
     }
   }
 
+  test("misc crc32 function") {
+    val df = Seq(("ABC", Array[Byte](1, 2, 3, 4, 5, 6))).toDF("a", "b")
+    checkAnswer(
+      df.select(crc32($"a"), crc32("b")),
+      Row(2743272264L, 2180413220L))
+
+    checkAnswer(
+      df.selectExpr("crc32(a)", "crc32(b)"),
+      Row(2743272264L, 2180413220L))
+  }
+
   test("string length function") {
     checkAnswer(
       nullStrings.select(strlen($"s"), strlen("s")),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org