You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/16 20:58:55 UTC

[spark] branch master updated: [SPARK-27735][SS] Parsing interval string should be case-insensitive in SS

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a317c8  [SPARK-27735][SS] Parsing interval string should be case-insensitive in SS
6a317c8 is described below

commit 6a317c8f014557dfd60931bd1eac3f545520d939
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Thu May 16 13:58:27 2019 -0700

    [SPARK-27735][SS] Parsing interval string should be case-insensitive in SS
    
    ## What changes were proposed in this pull request?
    
    Some APIs in Structured Streaming requires the user to specify an interval. Right now these APIs don't accept upper-case strings.
    
    This PR adds a new method `fromCaseInsensitiveString` to `CalendarInterval` to support paring upper-case strings, and fixes all APIs that need to parse an interval string.
    
    ## How was this patch tested?
    
    The new unit test.
    
    Closes #24619 from zsxwing/SPARK-27735.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/unsafe/types/CalendarInterval.java       | 25 ++++++++++++++++++++++
 .../spark/unsafe/types/CalendarIntervalSuite.java  | 25 ++++++++++++++++++++++
 .../sql/catalyst/expressions/TimeWindow.scala      | 17 +--------------
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 10 +++++++--
 .../sql/execution/streaming/GroupStateImpl.scala   | 17 +--------------
 .../streaming/continuous/ContinuousTrigger.scala   | 15 +------------
 .../spark/sql/streaming/ProcessingTime.scala       | 15 +------------
 7 files changed, 62 insertions(+), 62 deletions(-)

diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
index 611b2a2..e36efa3 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
@@ -18,6 +18,7 @@
 package org.apache.spark.unsafe.types;
 
 import java.io.Serializable;
+import java.util.Locale;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -66,6 +67,10 @@ public final class CalendarInterval implements Serializable {
     }
   }
 
+  /**
+   * Convert a string to CalendarInterval. Return null if the input string is not a valid interval.
+   * This method is case-sensitive and all characters in the input string should be in lower case.
+   */
   public static CalendarInterval fromString(String s) {
     if (s == null) {
       return null;
@@ -87,6 +92,26 @@ public final class CalendarInterval implements Serializable {
     }
   }
 
+  /**
+   * Convert a string to CalendarInterval. Unlike fromString, this method is case-insensitive and
+   * will throw IllegalArgumentException when the input string is not a valid interval.
+   *
+   * @throws IllegalArgumentException if the string is not a valid internal.
+   */
+  public static CalendarInterval fromCaseInsensitiveString(String s) {
+    if (s == null || s.trim().isEmpty()) {
+      throw new IllegalArgumentException("Interval cannot be null or blank.");
+    }
+    String sInLowerCase = s.trim().toLowerCase(Locale.ROOT);
+    String interval =
+      sInLowerCase.startsWith("interval ") ? sInLowerCase : "interval " + sInLowerCase;
+    CalendarInterval cal = fromString(interval);
+    if (cal == null) {
+      throw new IllegalArgumentException("Invalid interval: " + s);
+    }
+    return cal;
+  }
+
   public static long toLongWithRange(String fieldName,
       String s, long minValue, long maxValue) throws IllegalArgumentException {
     long result = 0;
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java
index 1e55691..994af8f 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java
@@ -105,6 +105,31 @@ public class CalendarIntervalSuite {
   }
 
   @Test
+  public void fromCaseInsensitiveStringTest() {
+    for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) {
+      assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000));
+    }
+
+    for (String input : new String[]{null, "", " "}) {
+      try {
+        fromCaseInsensitiveString(input);
+        fail("Expected to throw an exception for the invalid input");
+      } catch (IllegalArgumentException e) {
+        assertTrue(e.getMessage().contains("cannot be null or blank"));
+      }
+    }
+
+    for (String input : new String[]{"interval", "interval1 day", "foo", "foo 1 day"}) {
+      try {
+        fromCaseInsensitiveString(input);
+        fail("Expected to throw an exception for the invalid input");
+      } catch (IllegalArgumentException e) {
+        assertTrue(e.getMessage().contains("Invalid interval"));
+      }
+    }
+  }
+
+  @Test
   public void fromYearMonthStringTest() {
     String input;
     CalendarInterval i;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index 8e48856..9aae678 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.commons.lang3.StringUtils
-
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
@@ -104,20 +102,7 @@ object TimeWindow {
    *         precision.
    */
   private def getIntervalInMicroSeconds(interval: String): Long = {
-    if (StringUtils.isBlank(interval)) {
-      throw new IllegalArgumentException(
-        "The window duration, slide duration and start time cannot be null or blank.")
-    }
-    val intervalString = if (interval.startsWith("interval")) {
-      interval
-    } else {
-      "interval " + interval
-    }
-    val cal = CalendarInterval.fromString(intervalString)
-    if (cal == null) {
-      throw new IllegalArgumentException(
-        s"The provided interval ($interval) did not correspond to a valid interval string.")
-    }
+    val cal = CalendarInterval.fromCaseInsensitiveString(interval)
     if (cal.months > 0) {
       throw new IllegalArgumentException(
         s"Intervals greater than a month is not supported ($interval).")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5d6e530..74cb3e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -695,8 +695,14 @@ class Dataset[T] private[sql](
   // defined on a derived column cannot referenced elsewhere in the plan.
   def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
     val parsedDelay =
-      Option(CalendarInterval.fromString("interval " + delayThreshold))
-        .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
+      try {
+        CalendarInterval.fromCaseInsensitiveString(delayThreshold)
+      } catch {
+        case e: IllegalArgumentException =>
+          throw new AnalysisException(
+            s"Unable to parse time delay '$delayThreshold'",
+            cause = Some(e))
+      }
     require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
       s"delay threshold ($delayThreshold) should not be negative.")
     EliminateEventTimeWatermark(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
index fcb230b..dda9d41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.streaming
 import java.sql.Date
 import java.util.concurrent.TimeUnit
 
-import org.apache.commons.lang3.StringUtils
-
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
 import org.apache.spark.sql.execution.streaming.GroupStateImpl._
 import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
@@ -161,20 +159,7 @@ private[sql] class GroupStateImpl[S] private(
   def getTimeoutTimestamp: Long = timeoutTimestamp
 
   private def parseDuration(duration: String): Long = {
-    if (StringUtils.isBlank(duration)) {
-      throw new IllegalArgumentException(
-        "Provided duration is null or blank.")
-    }
-    val intervalString = if (duration.startsWith("interval")) {
-      duration
-    } else {
-      "interval " + duration
-    }
-    val cal = CalendarInterval.fromString(intervalString)
-    if (cal == null) {
-      throw new IllegalArgumentException(
-        s"Provided duration ($duration) is not valid.")
-    }
+    val cal = CalendarInterval.fromCaseInsensitiveString(duration)
     if (cal.milliseconds < 0 || cal.months < 0) {
       throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
index fd0ff31..bd343f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
@@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration.Duration
 
-import org.apache.commons.lang3.StringUtils
-
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.unsafe.types.CalendarInterval
@@ -38,18 +36,7 @@ case class ContinuousTrigger(intervalMs: Long) extends Trigger {
 
 private[sql] object ContinuousTrigger {
   def apply(interval: String): ContinuousTrigger = {
-    if (StringUtils.isBlank(interval)) {
-      throw new IllegalArgumentException(
-        "interval cannot be null or blank.")
-    }
-    val cal = if (interval.startsWith("interval")) {
-      CalendarInterval.fromString(interval)
-    } else {
-      CalendarInterval.fromString("interval " + interval)
-    }
-    if (cal == null) {
-      throw new IllegalArgumentException(s"Invalid interval: $interval")
-    }
+    val cal = CalendarInterval.fromCaseInsensitiveString(interval)
     if (cal.months > 0) {
       throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
index 38b0776..417d698 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
@@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration.Duration
 
-import org.apache.commons.lang3.StringUtils
-
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.unsafe.types.CalendarInterval
 
@@ -76,18 +74,7 @@ object ProcessingTime {
    */
   @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
   def apply(interval: String): ProcessingTime = {
-    if (StringUtils.isBlank(interval)) {
-      throw new IllegalArgumentException(
-        "interval cannot be null or blank.")
-    }
-    val cal = if (interval.startsWith("interval")) {
-      CalendarInterval.fromString(interval)
-    } else {
-      CalendarInterval.fromString("interval " + interval)
-    }
-    if (cal == null) {
-      throw new IllegalArgumentException(s"Invalid interval: $interval")
-    }
+    val cal = CalendarInterval.fromCaseInsensitiveString(interval)
     if (cal.months > 0) {
       throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org