You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/03/13 03:08:47 UTC

spark git commit: [SPARK-17495][SQL] Support date, timestamp and interval types in Hive hash

Repository: spark
Updated Branches:
  refs/heads/master 0a4d06a7c -> 945668854


[SPARK-17495][SQL] Support date, timestamp and interval types in Hive hash

## What changes were proposed in this pull request?

- Timestamp hashing is done as per [TimestampWritable.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java#L406) in Hive
- Interval hashing is done as per [HiveIntervalDayTime.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/storage-api/src/java/org/apache/hadoop/hive/common/type/HiveIntervalDayTime.java#L178). Note that there are inherent differences in how Hive and Spark store intervals under the hood which limits the ability to be in completely sync with hive's hashing function. I have explained this in the method doc.
- Date type was already supported. This PR adds test for that.

## How was this patch tested?

Added unit tests

Author: Tejas Patil <te...@fb.com>

Closes #17062 from tejasapatil/SPARK-17495_time_related_types.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94566885
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94566885
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94566885

Branch: refs/heads/master
Commit: 9456688547522a62f1e7520e9b3564550c57aa5d
Parents: 0a4d06a
Author: Tejas Patil <te...@fb.com>
Authored: Sun Mar 12 20:08:44 2017 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Sun Mar 12 20:08:44 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/expressions/hash.scala   |  71 ++++++-
 .../expressions/HashExpressionsSuite.scala      | 208 ++++++++++++++++++-
 2 files changed, 268 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94566885/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 03101b4..2a5963d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -335,6 +335,8 @@ abstract class HashExpression[E] extends Expression {
     }
   }
 
+  protected def genHashTimestamp(t: String, result: String): String = genHashLong(t, result)
+
   protected def genHashCalendarInterval(input: String, result: String): String = {
     val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)"
     s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);"
@@ -400,7 +402,8 @@ abstract class HashExpression[E] extends Expression {
     case NullType => ""
     case BooleanType => genHashBoolean(input, result)
     case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result)
-    case LongType | TimestampType => genHashLong(input, result)
+    case LongType => genHashLong(input, result)
+    case TimestampType => genHashTimestamp(input, result)
     case FloatType => genHashFloat(input, result)
     case DoubleType => genHashDouble(input, result)
     case d: DecimalType => genHashDecimal(ctx, d, input, result)
@@ -433,6 +436,10 @@ abstract class InterpretedHashFunction {
 
   protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long
 
+  /**
+   * Computes hash of a given `value` of type `dataType`. The caller needs to check the validity
+   * of input `value`.
+   */
   def hash(value: Any, dataType: DataType, seed: Long): Long = {
     value match {
       case null => seed
@@ -580,8 +587,6 @@ object XxHash64Function extends InterpretedHashFunction {
  *
  * We should use this hash function for both shuffle and bucket of Hive tables, so that
  * we can guarantee shuffle and bucketing have same data distribution
- *
- * TODO: Support date related types
  */
 @ExpressionDescription(
   usage = "_FUNC_(expr1, expr2, ...) - Returns a hash value of the arguments.")
@@ -648,11 +653,16 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
 
   override protected def genHashCalendarInterval(input: String, result: String): String = {
     s"""
-        $result = (31 * $hasherClassName.hashInt($input.months)) +
-          $hasherClassName.hashLong($input.microseconds);"
+      $result = (int)
+        ${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashCalendarInterval($input);
      """
   }
 
+  override protected def genHashTimestamp(input: String, result: String): String =
+    s"""
+      $result = (int) ${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashTimestamp($input);
+     """
+
   override protected def genHashString(input: String, result: String): String = {
     val baseObject = s"$input.getBaseObject()"
     val baseOffset = s"$input.getBaseOffset()"
@@ -781,6 +791,49 @@ object HiveHashFunction extends InterpretedHashFunction {
     result
   }
 
+  /**
+   * Mimics TimestampWritable.hashCode() in Hive
+   */
+  def hashTimestamp(timestamp: Long): Long = {
+    val timestampInSeconds = timestamp / 1000000
+    val nanoSecondsPortion = (timestamp % 1000000) * 1000
+
+    var result = timestampInSeconds
+    result <<= 30 // the nanosecond part fits in 30 bits
+    result |= nanoSecondsPortion
+    ((result >>> 32) ^ result).toInt
+  }
+
+  /**
+   * Hive allows input intervals to be defined using units below but the intervals
+   * have to be from the same category:
+   * - year, month (stored as HiveIntervalYearMonth)
+   * - day, hour, minute, second, nanosecond (stored as HiveIntervalDayTime)
+   *
+   * eg. (INTERVAL '30' YEAR + INTERVAL '-23' DAY) fails in Hive
+   *
+   * This method mimics HiveIntervalDayTime.hashCode() in Hive.
+   *
+   * Two differences wrt Hive due to how intervals are stored in Spark vs Hive:
+   *
+   * - If the `INTERVAL` is backed as HiveIntervalYearMonth in Hive, then this method will not
+   *   produce Hive compatible result. The reason being Spark's representation of calendar does not
+   *   have such categories based on the interval and is unified.
+   *
+   * - Spark's [[CalendarInterval]] has precision upto microseconds but Hive's
+   *   HiveIntervalDayTime can store data with precision upto nanoseconds. So, any input intervals
+   *   with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output)
+   */
+  def hashCalendarInterval(calendarInterval: CalendarInterval): Long = {
+    val totalSeconds = calendarInterval.microseconds / CalendarInterval.MICROS_PER_SECOND.toInt
+    val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt
+
+    val nanoSeconds =
+      (calendarInterval.microseconds -
+        (totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000
+     (result * 37) + nanoSeconds
+  }
+
   override def hash(value: Any, dataType: DataType, seed: Long): Long = {
     value match {
       case null => 0
@@ -834,10 +887,10 @@ object HiveHashFunction extends InterpretedHashFunction {
         }
         result
 
-      case d: Decimal =>
-        normalizeDecimal(d.toJavaBigDecimal).hashCode()
-
-      case _ => super.hash(value, dataType, seed)
+      case d: Decimal => normalizeDecimal(d.toJavaBigDecimal).hashCode()
+      case timestamp: Long if dataType.isInstanceOf[TimestampType] => hashTimestamp(timestamp)
+      case calendarInterval: CalendarInterval => hashCalendarInterval(calendarInterval)
+      case _ => super.hash(value, dataType, 0)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94566885/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
index 0c77dc2..59fc8ea 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
@@ -18,18 +18,20 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import java.nio.charset.StandardCharsets
+import java.util.TimeZone
 
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.commons.codec.digest.DigestUtils
+import org.scalatest.exceptions.TestFailedException
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.types.{ArrayType, StructType, _}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   val random = new scala.util.Random
@@ -168,6 +170,208 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     // scalastyle:on nonascii
   }
 
+  test("hive-hash for date type") {
+    def checkHiveHashForDateType(dateString: String, expected: Long): Unit = {
+      checkHiveHash(
+        DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
+        DateType,
+        expected)
+    }
+
+    // basic case
+    checkHiveHashForDateType("2017-01-01", 17167)
+
+    // boundary cases
+    checkHiveHashForDateType("0000-01-01", -719530)
+    checkHiveHashForDateType("9999-12-31", 2932896)
+
+    // epoch
+    checkHiveHashForDateType("1970-01-01", 0)
+
+    // before epoch
+    checkHiveHashForDateType("1800-01-01", -62091)
+
+    // Invalid input: bad date string. Hive returns 0 for such cases
+    intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0))
+    intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0))
+    intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0))
+
+    // Invalid input: Empty string. Hive returns 0 for this case
+    intercept[NoSuchElementException](checkHiveHashForDateType("", 0))
+
+    // Invalid input: February 30th for a leap year. Hive supports this but Spark doesn't
+    intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861))
+  }
+
+  test("hive-hash for timestamp type") {
+    def checkHiveHashForTimestampType(
+        timestamp: String,
+        expected: Long,
+        timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = {
+      checkHiveHash(
+        DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), timeZone).get,
+        TimestampType,
+        expected)
+    }
+
+    // basic case
+    checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271)
+
+    // with higher precision
+    checkHiveHashForTimestampType("2017-02-24 10:56:29.111111", 1353936655)
+
+    // with different timezone
+    checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445732471,
+      TimeZone.getTimeZone("US/Pacific"))
+
+    // boundary cases
+    checkHiveHashForTimestampType("0001-01-01 00:00:00", 1645926784)
+    checkHiveHashForTimestampType("9999-01-01 00:00:00", -1081818240)
+
+    // epoch
+    checkHiveHashForTimestampType("1970-01-01 00:00:00", 0)
+
+    // before epoch
+    checkHiveHashForTimestampType("1800-01-01 03:12:45", -267420885)
+
+    // Invalid input: bad timestamp string. Hive returns 0 for such cases
+    intercept[NoSuchElementException](checkHiveHashForTimestampType("0-0-0 0:0:0", 0))
+    intercept[NoSuchElementException](checkHiveHashForTimestampType("-99-99-99 99:99:45", 0))
+    intercept[NoSuchElementException](checkHiveHashForTimestampType("555555-55555-5555", 0))
+
+    // Invalid input: Empty string. Hive returns 0 for this case
+    intercept[NoSuchElementException](checkHiveHashForTimestampType("", 0))
+
+    // Invalid input: February 30th is a leap year. Hive supports this but Spark doesn't
+    intercept[NoSuchElementException](checkHiveHashForTimestampType("2016-02-30 00:00:00", 0))
+
+    // Invalid input: Hive accepts upto 9 decimal place precision but Spark uses upto 6
+    intercept[TestFailedException](checkHiveHashForTimestampType("2017-02-24 10:56:29.11111111", 0))
+  }
+
+  test("hive-hash for CalendarInterval type") {
+    def checkHiveHashForIntervalType(interval: String, expected: Long): Unit = {
+      checkHiveHash(CalendarInterval.fromString(interval), CalendarIntervalType, expected)
+    }
+
+    // ----- MICROSEC -----
+
+    // basic case
+    checkHiveHashForIntervalType("interval 1 microsecond", 24273)
+
+    // negative
+    checkHiveHashForIntervalType("interval -1 microsecond", 22273)
+
+    // edge / boundary cases
+    checkHiveHashForIntervalType("interval 0 microsecond", 23273)
+    checkHiveHashForIntervalType("interval 999 microsecond", 1022273)
+    checkHiveHashForIntervalType("interval -999 microsecond", -975727)
+
+    // ----- MILLISEC -----
+
+    // basic case
+    checkHiveHashForIntervalType("interval 1 millisecond", 1023273)
+
+    // negative
+    checkHiveHashForIntervalType("interval -1 millisecond", -976727)
+
+    // edge / boundary cases
+    checkHiveHashForIntervalType("interval 0 millisecond", 23273)
+    checkHiveHashForIntervalType("interval 999 millisecond", 999023273)
+    checkHiveHashForIntervalType("interval -999 millisecond", -998976727)
+
+    // ----- SECOND -----
+
+    // basic case
+    checkHiveHashForIntervalType("interval 1 second", 23310)
+
+    // negative
+    checkHiveHashForIntervalType("interval -1 second", 23273)
+
+    // edge / boundary cases
+    checkHiveHashForIntervalType("interval 0 second", 23273)
+    checkHiveHashForIntervalType("interval 2147483647 second", -2147460412)
+    checkHiveHashForIntervalType("interval -2147483648 second", -2147460412)
+
+    // Out of range for both Hive and Spark
+    // Hive throws an exception. Spark overflows and returns wrong output
+    // checkHiveHashForIntervalType("interval 9999999999 second", 0)
+
+    // ----- MINUTE -----
+
+    // basic cases
+    checkHiveHashForIntervalType("interval 1 minute", 25493)
+
+    // negative
+    checkHiveHashForIntervalType("interval -1 minute", 25456)
+
+    // edge / boundary cases
+    checkHiveHashForIntervalType("interval 0 minute", 23273)
+    checkHiveHashForIntervalType("interval 2147483647 minute", 21830)
+    checkHiveHashForIntervalType("interval -2147483648 minute", 22163)
+
+    // Out of range for both Hive and Spark
+    // Hive throws an exception. Spark overflows and returns wrong output
+    // checkHiveHashForIntervalType("interval 9999999999 minute", 0)
+
+    // ----- HOUR -----
+
+    // basic case
+    checkHiveHashForIntervalType("interval 1 hour", 156473)
+
+    // negative
+    checkHiveHashForIntervalType("interval -1 hour", 156436)
+
+    // edge / boundary cases
+    checkHiveHashForIntervalType("interval 0 hour", 23273)
+    checkHiveHashForIntervalType("interval 2147483647 hour", -62308)
+    checkHiveHashForIntervalType("interval -2147483648 hour", -43327)
+
+    // Out of range for both Hive and Spark
+    // Hive throws an exception. Spark overflows and returns wrong output
+    // checkHiveHashForIntervalType("interval 9999999999 hour", 0)
+
+    // ----- DAY -----
+
+    // basic cases
+    checkHiveHashForIntervalType("interval 1 day", 3220073)
+
+    // negative
+    checkHiveHashForIntervalType("interval -1 day", 3220036)
+
+    // edge / boundary cases
+    checkHiveHashForIntervalType("interval 0 day", 23273)
+    checkHiveHashForIntervalType("interval 106751991 day", -451506760)
+    checkHiveHashForIntervalType("interval -106751991 day", -451514123)
+
+    // Hive supports `day` for a longer range but Spark's range is smaller
+    // The check for range is done at the parser level so this does not fail in Spark
+    // checkHiveHashForIntervalType("interval -2147483648 day", -1575127)
+    // checkHiveHashForIntervalType("interval 2147483647 day", -4767228)
+
+    // Out of range for both Hive and Spark
+    // Hive throws an exception. Spark overflows and returns wrong output
+    // checkHiveHashForIntervalType("interval 9999999999 day", 0)
+
+    // ----- MIX -----
+
+    checkHiveHashForIntervalType("interval 0 day 0 hour", 23273)
+    checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute", 23273)
+    checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second", 23273)
+    checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second 0 millisecond", 23273)
+    checkHiveHashForIntervalType(
+      "interval 0 day 0 hour 0 minute 0 second 0 millisecond 0 microsecond", 23273)
+
+    checkHiveHashForIntervalType("interval 6 day 15 hour", 21202073)
+    checkHiveHashForIntervalType("interval 5 day 4 hour 8 minute", 16557833)
+    checkHiveHashForIntervalType("interval -23 day 56 hour -1111113 minute 9898989 second",
+      -2128468593)
+    checkHiveHashForIntervalType("interval 66 day 12 hour 39 minute 23 second 987 millisecond",
+      1199697904)
+    checkHiveHashForIntervalType(
+      "interval 66 day 12 hour 39 minute 23 second 987 millisecond 123 microsecond", 1199820904)
+  }
+
   test("hive-hash for array") {
     // empty array
     checkHiveHash(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org