You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/16 09:21:42 UTC

[flink] branch release-1.11 updated: [FLINK-18083][hbase] Improve exception message of TIMESTAMP/TIME out of the HBase connector supported precision

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 2ab524b  [FLINK-18083][hbase] Improve exception message of TIMESTAMP/TIME out of the HBase connector supported precision
2ab524b is described below

commit 2ab524b6531bdf15249e3815099ef68d357d3685
Author: Leonard Xu <xb...@163.com>
AuthorDate: Tue Jun 16 17:20:14 2020 +0800

    [FLINK-18083][hbase] Improve exception message of TIMESTAMP/TIME out of the HBase connector supported precision
    
    This closes #12627
---
 .../flink/connector/hbase/util/HBaseSerde.java     | 37 ++++++++++++--
 .../flink/connector/hbase/util/HBaseTypeUtils.java | 22 +++++++-
 .../hbase/HBaseDynamicTableFactoryTest.java        | 58 ++++++++++++++++++++++
 3 files changed, 112 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index e5a377f..693cb62 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -52,6 +52,11 @@ public class HBaseSerde {
 
 	private static final byte[] EMPTY_BYTES = new byte[]{};
 
+	private static final int MIN_TIMESTAMP_PRECISION = 0;
+	private static final int MAX_TIMESTAMP_PRECISION = 3;
+	private static final int MIN_TIME_PRECISION = 0;
+	private static final int MAX_TIME_PRECISION = 3;
+
 	private final byte[] nullStringBytes;
 
 	// row key index in output row
@@ -276,9 +281,16 @@ public class HBaseSerde {
 				return (row, pos) -> Bytes.toBytes(row.getShort(pos));
 			case INTEGER:
 			case DATE:
-			case TIME_WITHOUT_TIME_ZONE:
 			case INTERVAL_YEAR_MONTH:
 				return (row, pos) -> Bytes.toBytes(row.getInt(pos));
+			case TIME_WITHOUT_TIME_ZONE:
+				final int timePrecision = getPrecision(fieldType);
+				if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
+					throw new UnsupportedOperationException(
+						String.format("The precision %s of TIME type is out of the range [%s, %s] supported by " +
+							"HBase connector", timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
+				}
+				return (row, pos) -> Bytes.toBytes(row.getInt(pos));
 			case BIGINT:
 			case INTERVAL_DAY_TIME:
 				return (row, pos) -> Bytes.toBytes(row.getLong(pos));
@@ -288,7 +300,13 @@ public class HBaseSerde {
 				return (row, pos) -> Bytes.toBytes(row.getDouble(pos));
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
 			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-				return createTimestampEncoder(getPrecision(fieldType));
+				final int timestampPrecision = getPrecision(fieldType);
+				if (timestampPrecision < MIN_TIMESTAMP_PRECISION || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
+					throw new UnsupportedOperationException(
+						String.format("The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " +
+							"HBase connector", timestampPrecision, MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+				}
+				return createTimestampEncoder(timestampPrecision);
 			default:
 				throw new UnsupportedOperationException("Unsupported type: " + fieldType);
 		}
@@ -367,9 +385,16 @@ public class HBaseSerde {
 				return Bytes::toShort;
 			case INTEGER:
 			case DATE:
-			case TIME_WITHOUT_TIME_ZONE:
 			case INTERVAL_YEAR_MONTH:
 				return Bytes::toInt;
+			case TIME_WITHOUT_TIME_ZONE:
+				final int timePrecision = getPrecision(fieldType);
+				if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
+					throw new UnsupportedOperationException(
+						String.format("The precision %s of TIME type is out of the range [%s, %s] supported by " +
+							"HBase connector", timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
+				}
+				return Bytes::toInt;
 			case BIGINT:
 			case INTERVAL_DAY_TIME:
 				return Bytes::toLong;
@@ -379,6 +404,12 @@ public class HBaseSerde {
 				return Bytes::toDouble;
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
 			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+				final int timestampPrecision = getPrecision(fieldType);
+				if (timestampPrecision < MIN_TIMESTAMP_PRECISION || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
+					throw new UnsupportedOperationException(
+						String.format("The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " +
+							"HBase connector", timestampPrecision, MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+				}
 				return createTimestampDecoder();
 			default:
 				throw new UnsupportedOperationException("Unsupported type: " + fieldType);
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTypeUtils.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTypeUtils.java
index ec97597..0665e4b 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTypeUtils.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTypeUtils.java
@@ -42,6 +42,11 @@ public class HBaseTypeUtils {
 
 	private static final byte[] EMPTY_BYTES = new byte[]{};
 
+	private static final int MIN_TIMESTAMP_PRECISION = 0;
+	private static final int MAX_TIMESTAMP_PRECISION = 3;
+	private static final int MIN_TIME_PRECISION = 0;
+	private static final int MAX_TIME_PRECISION = 3;
+
 	/**
 	 * Deserialize byte array to Java Object with the given type.
 	 */
@@ -184,16 +189,29 @@ public class HBaseTypeUtils {
 			case SMALLINT:
 			case INTEGER:
 			case DATE:
-			case TIME_WITHOUT_TIME_ZONE:
 			case INTERVAL_YEAR_MONTH:
 			case BIGINT:
 			case INTERVAL_DAY_TIME:
 			case FLOAT:
 			case DOUBLE:
 				return true;
+			case TIME_WITHOUT_TIME_ZONE:
+				final int timePrecision = getPrecision(type);
+				if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
+					throw new UnsupportedOperationException(
+						String.format("The precision %s of TIME type is out of the range [%s, %s] supported by " +
+							"HBase connector", timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
+				}
+				return true;
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
 			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-				return getPrecision(type) <= 3;
+				final int timestampPrecision = getPrecision(type);
+				if (timestampPrecision < MIN_TIMESTAMP_PRECISION || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
+					throw new UnsupportedOperationException(
+						String.format("The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " +
+							"HBase connector", timestampPrecision, MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+				}
+				return true;
 			case TIMESTAMP_WITH_TIME_ZONE:
 			case ARRAY:
 			case MULTISET:
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
index d51f11b..061114e 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
@@ -261,6 +261,64 @@ public class HBaseDynamicTableFactoryTest {
 		}
 	}
 
+	@Test
+	public void testTypeWithUnsupportedPrecision() {
+		Map<String, String> options = getAllOptions();
+		// test unsupported timestamp precision
+		TableSchema schema = TableSchema.builder()
+			.field(ROWKEY, STRING())
+			.field(FAMILY1, ROW(
+				FIELD(COL1, TIMESTAMP(6)),
+				FIELD(COL2, INT())))
+			.build();
+		try {
+			createTableSource(schema, options);
+			fail("Should fail");
+		} catch (Exception e) {
+			assertTrue(ExceptionUtils
+				.findThrowableWithMessage(e, "The precision 6 of TIMESTAMP type is out of the range [0, 3]" +
+					" supported by HBase connector")
+				.isPresent());
+		}
+
+		try {
+			createTableSink(schema, options);
+			fail("Should fail");
+		} catch (Exception e) {
+			assertTrue(ExceptionUtils
+				.findThrowableWithMessage(e, "The precision 6 of TIMESTAMP type is out of the range [0, 3]" +
+					" supported by HBase connector")
+				.isPresent());
+		}
+		// test unsupported time precision
+		schema = TableSchema.builder()
+			.field(ROWKEY, STRING())
+			.field(FAMILY1, ROW(
+				FIELD(COL1, TIME(6)),
+				FIELD(COL2, INT())))
+			.build();
+
+		try {
+			createTableSource(schema, options);
+			fail("Should fail");
+		} catch (Exception e) {
+			assertTrue(ExceptionUtils
+				.findThrowableWithMessage(e, "The precision 6 of TIME type is out of the range [0, 3]" +
+					" supported by HBase connector")
+				.isPresent());
+		}
+
+		try {
+			createTableSink(schema, options);
+			fail("Should fail");
+		} catch (Exception e) {
+			assertTrue(ExceptionUtils
+				.findThrowableWithMessage(e, "The precision 6 of TIME type is out of the range [0, 3]" +
+					" supported by HBase connector")
+				.isPresent());
+		}
+	}
+
 	private Map<String, String> getAllOptions() {
 		Map<String, String> options = new HashMap<>();
 		options.put("connector", "hbase-1.4");