You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/16 20:58:55 UTC
[spark] branch master updated: [SPARK-27735][SS] Parsing interval
string should be case-insensitive in SS
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6a317c8 [SPARK-27735][SS] Parsing interval string should be case-insensitive in SS
6a317c8 is described below
commit 6a317c8f014557dfd60931bd1eac3f545520d939
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Thu May 16 13:58:27 2019 -0700
[SPARK-27735][SS] Parsing interval string should be case-insensitive in SS
## What changes were proposed in this pull request?
Some APIs in Structured Streaming requires the user to specify an interval. Right now these APIs don't accept upper-case strings.
This PR adds a new method `fromCaseInsensitiveString` to `CalendarInterval` to support paring upper-case strings, and fixes all APIs that need to parse an interval string.
## How was this patch tested?
The new unit test.
Closes #24619 from zsxwing/SPARK-27735.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../spark/unsafe/types/CalendarInterval.java | 25 ++++++++++++++++++++++
.../spark/unsafe/types/CalendarIntervalSuite.java | 25 ++++++++++++++++++++++
.../sql/catalyst/expressions/TimeWindow.scala | 17 +--------------
.../main/scala/org/apache/spark/sql/Dataset.scala | 10 +++++++--
.../sql/execution/streaming/GroupStateImpl.scala | 17 +--------------
.../streaming/continuous/ContinuousTrigger.scala | 15 +------------
.../spark/sql/streaming/ProcessingTime.scala | 15 +------------
7 files changed, 62 insertions(+), 62 deletions(-)
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
index 611b2a2..e36efa3 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
@@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;
import java.io.Serializable;
+import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -66,6 +67,10 @@ public final class CalendarInterval implements Serializable {
}
}
+ /**
+ * Convert a string to CalendarInterval. Return null if the input string is not a valid interval.
+ * This method is case-sensitive and all characters in the input string should be in lower case.
+ */
public static CalendarInterval fromString(String s) {
if (s == null) {
return null;
@@ -87,6 +92,26 @@ public final class CalendarInterval implements Serializable {
}
}
+ /**
+ * Convert a string to CalendarInterval. Unlike fromString, this method is case-insensitive and
+ * will throw IllegalArgumentException when the input string is not a valid interval.
+ *
+ * @throws IllegalArgumentException if the string is not a valid internal.
+ */
+ public static CalendarInterval fromCaseInsensitiveString(String s) {
+ if (s == null || s.trim().isEmpty()) {
+ throw new IllegalArgumentException("Interval cannot be null or blank.");
+ }
+ String sInLowerCase = s.trim().toLowerCase(Locale.ROOT);
+ String interval =
+ sInLowerCase.startsWith("interval ") ? sInLowerCase : "interval " + sInLowerCase;
+ CalendarInterval cal = fromString(interval);
+ if (cal == null) {
+ throw new IllegalArgumentException("Invalid interval: " + s);
+ }
+ return cal;
+ }
+
public static long toLongWithRange(String fieldName,
String s, long minValue, long maxValue) throws IllegalArgumentException {
long result = 0;
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java
index 1e55691..994af8f 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java
@@ -105,6 +105,31 @@ public class CalendarIntervalSuite {
}
@Test
+ public void fromCaseInsensitiveStringTest() {
+ for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) {
+ assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000));
+ }
+
+ for (String input : new String[]{null, "", " "}) {
+ try {
+ fromCaseInsensitiveString(input);
+ fail("Expected to throw an exception for the invalid input");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("cannot be null or blank"));
+ }
+ }
+
+ for (String input : new String[]{"interval", "interval1 day", "foo", "foo 1 day"}) {
+ try {
+ fromCaseInsensitiveString(input);
+ fail("Expected to throw an exception for the invalid input");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Invalid interval"));
+ }
+ }
+ }
+
+ @Test
public void fromYearMonthStringTest() {
String input;
CalendarInterval i;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index 8e48856..9aae678 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.commons.lang3.StringUtils
-
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
@@ -104,20 +102,7 @@ object TimeWindow {
* precision.
*/
private def getIntervalInMicroSeconds(interval: String): Long = {
- if (StringUtils.isBlank(interval)) {
- throw new IllegalArgumentException(
- "The window duration, slide duration and start time cannot be null or blank.")
- }
- val intervalString = if (interval.startsWith("interval")) {
- interval
- } else {
- "interval " + interval
- }
- val cal = CalendarInterval.fromString(intervalString)
- if (cal == null) {
- throw new IllegalArgumentException(
- s"The provided interval ($interval) did not correspond to a valid interval string.")
- }
+ val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(
s"Intervals greater than a month is not supported ($interval).")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5d6e530..74cb3e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -695,8 +695,14 @@ class Dataset[T] private[sql](
// defined on a derived column cannot referenced elsewhere in the plan.
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
val parsedDelay =
- Option(CalendarInterval.fromString("interval " + delayThreshold))
- .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
+ try {
+ CalendarInterval.fromCaseInsensitiveString(delayThreshold)
+ } catch {
+ case e: IllegalArgumentException =>
+ throw new AnalysisException(
+ s"Unable to parse time delay '$delayThreshold'",
+ cause = Some(e))
+ }
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
s"delay threshold ($delayThreshold) should not be negative.")
EliminateEventTimeWatermark(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
index fcb230b..dda9d41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.streaming
import java.sql.Date
import java.util.concurrent.TimeUnit
-import org.apache.commons.lang3.StringUtils
-
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
import org.apache.spark.sql.execution.streaming.GroupStateImpl._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
@@ -161,20 +159,7 @@ private[sql] class GroupStateImpl[S] private(
def getTimeoutTimestamp: Long = timeoutTimestamp
private def parseDuration(duration: String): Long = {
- if (StringUtils.isBlank(duration)) {
- throw new IllegalArgumentException(
- "Provided duration is null or blank.")
- }
- val intervalString = if (duration.startsWith("interval")) {
- duration
- } else {
- "interval " + duration
- }
- val cal = CalendarInterval.fromString(intervalString)
- if (cal == null) {
- throw new IllegalArgumentException(
- s"Provided duration ($duration) is not valid.")
- }
+ val cal = CalendarInterval.fromCaseInsensitiveString(duration)
if (cal.milliseconds < 0 || cal.months < 0) {
throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
index fd0ff31..bd343f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
@@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
-import org.apache.commons.lang3.StringUtils
-
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.unsafe.types.CalendarInterval
@@ -38,18 +36,7 @@ case class ContinuousTrigger(intervalMs: Long) extends Trigger {
private[sql] object ContinuousTrigger {
def apply(interval: String): ContinuousTrigger = {
- if (StringUtils.isBlank(interval)) {
- throw new IllegalArgumentException(
- "interval cannot be null or blank.")
- }
- val cal = if (interval.startsWith("interval")) {
- CalendarInterval.fromString(interval)
- } else {
- CalendarInterval.fromString("interval " + interval)
- }
- if (cal == null) {
- throw new IllegalArgumentException(s"Invalid interval: $interval")
- }
+ val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
index 38b0776..417d698 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
@@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
-import org.apache.commons.lang3.StringUtils
-
import org.apache.spark.annotation.Evolving
import org.apache.spark.unsafe.types.CalendarInterval
@@ -76,18 +74,7 @@ object ProcessingTime {
*/
@deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
def apply(interval: String): ProcessingTime = {
- if (StringUtils.isBlank(interval)) {
- throw new IllegalArgumentException(
- "interval cannot be null or blank.")
- }
- val cal = if (interval.startsWith("interval")) {
- CalendarInterval.fromString(interval)
- } else {
- CalendarInterval.fromString("interval " + interval)
- }
- if (cal == null) {
- throw new IllegalArgumentException(s"Invalid interval: $interval")
- }
+ val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org