You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/11 12:36:56 UTC

[flink] branch master updated (dafd488 -> 1426b21)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from dafd488  [hotfix] [docs] fix typo in docs/README.md
     new 768e81f  [FLINK-13198][core] Add a utility to parse String to Duration
     new 1426b21  [FLINK-13198][table-api] Add a utility to validate and parse duration in DescriptorProperties

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/flink/util/TimeUtils.java | 128 ++++++++++++++++++++
 .../java/org/apache/flink/util/TimeUtilsTest.java  | 131 +++++++++++++++++++++
 .../table/descriptors/DescriptorProperties.java    |  65 ++++++++++
 3 files changed, 324 insertions(+)
 create mode 100644 flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
 create mode 100644 flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java


[flink] 02/02: [FLINK-13198][table-api] Add a utility to validate and parse duration in DescriptorProperties

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1426b218678224344844e1f1bdd7f1b1c66e0a32
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Jul 11 18:57:35 2019 +0800

    [FLINK-13198][table-api] Add a utility to validate and parse duration in DescriptorProperties
---
 .../table/descriptors/DescriptorProperties.java    | 65 ++++++++++++++++++++++
 1 file changed, 65 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index 446fb73..3606504 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -29,8 +29,10 @@ import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.table.utils.TypeStringUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TimeUtils;
 
 import java.math.BigDecimal;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -548,6 +550,26 @@ public class DescriptorProperties {
 	}
 
 	/**
+	 * Returns a Java {@link Duration} under the given key if it exists.
+	 */
+	public Optional<Duration> getOptionalDuration(String key) {
+		return optionalGet(key).map((value) -> {
+			try {
+				return TimeUtils.parseDuration(value);
+			} catch (Exception e) {
+				throw new ValidationException("Invalid duration value for key '" + key + "'.", e);
+			}
+		});
+	}
+
+	/**
+	 * Returns a java {@link Duration} under the given existing key.
+	 */
+	public Duration getDuration(String key) {
+		return getOptionalDuration(key).orElseThrow(exceptionSupplier(key));
+	}
+
+	/**
 	 * Returns the property keys of fixed indexed properties.
 	 *
 	 * <p>For example:
@@ -1043,6 +1065,49 @@ public class DescriptorProperties {
 	}
 
 	/**
+	 * Validates a Java {@link Duration}.
+	 *
+	 * <p>The precision defines the allowed minimum unit in milliseconds (e.g. 1000 would only allow seconds).
+	 */
+	public void validateDuration(String key, boolean isOptional, int precision) {
+		validateDuration(key, isOptional, precision, 0L, Long.MAX_VALUE);
+	}
+
+	/**
+	 * Validates a Java {@link Duration}. The boundaries are inclusive and in milliseconds.
+	 *
+	 * <p>The precision defines the allowed minimum unit in milliseconds (e.g. 1000 would only allow seconds).
+	 */
+	public void validateDuration(String key, boolean isOptional, int precision, long min) {
+		validateDuration(key, isOptional, precision, min, Long.MAX_VALUE);
+	}
+
+	/**
+	 * Validates a Java {@link Duration}. The boundaries are inclusive and in milliseconds.
+	 *
+	 * <p>The precision defines the allowed minimum unit in milliseconds (e.g. 1000 would only allow seconds).
+	 */
+	public void validateDuration(String key, boolean isOptional, int precision, long min, long max) {
+		Preconditions.checkArgument(precision > 0);
+
+		validateComparable(
+			key,
+			isOptional,
+			min,
+			max,
+			"time interval (in milliseconds)",
+			(value) -> {
+				final long ms = TimeUtils.parseDuration(value).toMillis();
+				if (ms % precision != 0) {
+					throw new ValidationException(
+						"Duration for key '" + key + "' must be a multiple of " + precision + " milliseconds but was: " + value);
+				}
+				return ms;
+			}
+		);
+	}
+
+	/**
 	 * Validates an enum property with a set of validation logic for each enum value.
 	 */
 	public void validateEnum(String key, boolean isOptional, Map<String, Consumer<String>> enumValidation) {


[flink] 01/02: [FLINK-13198][core] Add a utility to parse String to Duration

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 768e81f8c72837e41d6dd930308f188b2b827413
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Jul 11 18:54:52 2019 +0800

    [FLINK-13198][core] Add a utility to parse String to Duration
---
 .../main/java/org/apache/flink/util/TimeUtils.java | 128 ++++++++++++++++++++
 .../java/org/apache/flink/util/TimeUtilsTest.java  | 131 +++++++++++++++++++++
 2 files changed, 259 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
new file mode 100644
index 0000000..a9a8825
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.time.Duration;
+import java.util.Locale;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Collection of utilities about time intervals.
+ */
+public class TimeUtils {
+
+	/**
+	 * Parse the given string to a java {@link Duration}.
+	 * The string is like "123ms", "321s", "12min" and such.
+	 *
+	 * @param text string to parse.
+	 */
+	public static Duration parseDuration(String text) {
+		checkNotNull(text, "text");
+
+		final String trimmed = text.trim();
+		checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
+
+		final int len = trimmed.length();
+		int pos = 0;
+
+		char current;
+		while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+			pos++;
+		}
+
+		final String number = trimmed.substring(0, pos);
+		final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+		if (number.isEmpty()) {
+			throw new NumberFormatException("text does not start with a number");
+		}
+
+		final long value;
+		try {
+			value = Long.parseLong(number); // this throws a NumberFormatException on overflow
+		} catch (NumberFormatException e) {
+			throw new IllegalArgumentException("The value '" + number +
+				"' cannot be re represented as 64bit number (numeric overflow).");
+		}
+
+		final long multiplier;
+		if (unit.isEmpty()) {
+			multiplier = 1L;
+		} else {
+			if (matchTimeUnit(unit, TimeUnit.MILLISECONDS)) {
+				multiplier = 1L;
+			} else if (matchTimeUnit(unit, TimeUnit.SECONDS)) {
+				multiplier = 1000L;
+			} else if (matchTimeUnit(unit, TimeUnit.MINUTES)) {
+				multiplier = 1000L * 60L;
+			} else if (matchTimeUnit(unit, TimeUnit.HOURS)) {
+				multiplier = 1000L * 60L * 60L;
+			} else {
+				throw new IllegalArgumentException("Time interval unit '" + unit +
+					"' does not match any of the recognized units: " + TimeUnit.getAllUnits());
+			}
+		}
+
+		final long result = value * multiplier;
+
+		// check for overflow
+		if (result / multiplier != value) {
+			throw new IllegalArgumentException("The value '" + text +
+				"' cannot be re represented as 64bit number of bytes (numeric overflow).");
+		}
+
+		return Duration.ofMillis(result);
+	}
+
+	private static boolean matchTimeUnit(String text, TimeUnit unit) {
+		return text.equals(unit.getUnit());
+	}
+
+	/**
+	 * Enum which defines time unit, mostly used to parse value from configuration file.
+	 */
+	private enum TimeUnit {
+		MILLISECONDS("ms"),
+		SECONDS("s"),
+		MINUTES("min"),
+		HOURS("h");
+
+		private String unit;
+
+		TimeUnit(String unit) {
+			this.unit = unit;
+		}
+
+		public String getUnit() {
+			return unit;
+		}
+
+		public static String getAllUnits() {
+			return String.join(" | ", new String[]{
+				MILLISECONDS.getUnit(),
+				SECONDS.getUnit(),
+				MINUTES.getUnit(),
+				HOURS.getUnit()
+			});
+		}
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java
new file mode 100644
index 0000000..632cf28
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link TimeUtils}.
+ */
+public class TimeUtilsTest {
+
+	@Test
+	public void testParseDurationMillis() {
+		assertEquals(1234, TimeUtils.parseDuration("1234").toMillis());
+		assertEquals(1234, TimeUtils.parseDuration("1234ms").toMillis());
+		assertEquals(1234, TimeUtils.parseDuration("1234 ms").toMillis());
+	}
+
+	@Test
+	public void testParseDurationSeconds() {
+		assertEquals(667766, TimeUtils.parseDuration("667766s").getSeconds());
+		assertEquals(667766, TimeUtils.parseDuration("667766 s").getSeconds());
+	}
+
+	@Test
+	public void testParseDurationMinutes() {
+		assertEquals(7657623, TimeUtils.parseDuration("7657623min").toMinutes());
+		assertEquals(7657623, TimeUtils.parseDuration("7657623 min").toMinutes());
+	}
+
+	@Test
+	public void testParseDurationHours() {
+		assertEquals(987654, TimeUtils.parseDuration("987654h").toHours());
+		assertEquals(987654, TimeUtils.parseDuration("987654 h").toHours());
+	}
+
+	@Test
+	public void testParseDurationUpperCase() {
+		assertEquals(1L, TimeUtils.parseDuration("1 MS").toMillis());
+		assertEquals(1L, TimeUtils.parseDuration("1 S").getSeconds());
+		assertEquals(1L, TimeUtils.parseDuration("1 MIN").toMinutes());
+		assertEquals(1L, TimeUtils.parseDuration("1 H").toHours());
+	}
+
+	@Test
+	public void testParseDurationTrim() {
+		assertEquals(155L, TimeUtils.parseDuration("      155      ").toMillis());
+		assertEquals(155L, TimeUtils.parseDuration("      155      ms   ").toMillis());
+	}
+
+	@Test
+	public void testParseDurationInvalid() {
+		// null
+		try {
+			TimeUtils.parseDuration(null);
+			fail("exception expected");
+		} catch (NullPointerException ignored) {
+		}
+
+		// empty
+		try {
+			TimeUtils.parseDuration("");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		// blank
+		try {
+			TimeUtils.parseDuration("     ");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		// no number
+		try {
+			TimeUtils.parseDuration("foobar or fubar or foo bazz");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		// wrong unit
+		try {
+			TimeUtils.parseDuration("16 gjah");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		// multiple numbers
+		try {
+			TimeUtils.parseDuration("16 16 17 18 ms");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		// negative number
+		try {
+			TimeUtils.parseDuration("-100 ms");
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testParseDurationNumberOverflow() {
+		TimeUtils.parseDuration("100000000000000000000000000000000 ms");
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testParseDurationNumberTimeUnitOverflow() {
+		TimeUtils.parseDuration("100000000000000000 h");
+	}
+}