You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/16 09:21:42 UTC
[flink] branch release-1.11 updated: [FLINK-18083][hbase] Improve
exception message of TIMESTAMP/TIME out of the HBase connector supported
precision
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 2ab524b [FLINK-18083][hbase] Improve exception message of TIMESTAMP/TIME out of the HBase connector supported precision
2ab524b is described below
commit 2ab524b6531bdf15249e3815099ef68d357d3685
Author: Leonard Xu <xb...@163.com>
AuthorDate: Tue Jun 16 17:20:14 2020 +0800
[FLINK-18083][hbase] Improve exception message of TIMESTAMP/TIME out of the HBase connector supported precision
This closes #12627
---
.../flink/connector/hbase/util/HBaseSerde.java | 37 ++++++++++++--
.../flink/connector/hbase/util/HBaseTypeUtils.java | 22 +++++++-
.../hbase/HBaseDynamicTableFactoryTest.java | 58 ++++++++++++++++++++++
3 files changed, 112 insertions(+), 5 deletions(-)
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index e5a377f..693cb62 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -52,6 +52,11 @@ public class HBaseSerde {
private static final byte[] EMPTY_BYTES = new byte[]{};
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+ private static final int MAX_TIMESTAMP_PRECISION = 3;
+ private static final int MIN_TIME_PRECISION = 0;
+ private static final int MAX_TIME_PRECISION = 3;
+
private final byte[] nullStringBytes;
// row key index in output row
@@ -276,9 +281,16 @@ public class HBaseSerde {
return (row, pos) -> Bytes.toBytes(row.getShort(pos));
case INTEGER:
case DATE:
- case TIME_WITHOUT_TIME_ZONE:
case INTERVAL_YEAR_MONTH:
return (row, pos) -> Bytes.toBytes(row.getInt(pos));
+ case TIME_WITHOUT_TIME_ZONE:
+ final int timePrecision = getPrecision(fieldType);
+ if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
+ throw new UnsupportedOperationException(
+ String.format("The precision %s of TIME type is out of the range [%s, %s] supported by " +
+ "HBase connector", timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
+ }
+ return (row, pos) -> Bytes.toBytes(row.getInt(pos));
case BIGINT:
case INTERVAL_DAY_TIME:
return (row, pos) -> Bytes.toBytes(row.getLong(pos));
@@ -288,7 +300,13 @@ public class HBaseSerde {
return (row, pos) -> Bytes.toBytes(row.getDouble(pos));
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return createTimestampEncoder(getPrecision(fieldType));
+ final int timestampPrecision = getPrecision(fieldType);
+ if (timestampPrecision < MIN_TIMESTAMP_PRECISION || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
+ throw new UnsupportedOperationException(
+ String.format("The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " +
+ "HBase connector", timestampPrecision, MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+ return createTimestampEncoder(timestampPrecision);
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
}
@@ -367,9 +385,16 @@ public class HBaseSerde {
return Bytes::toShort;
case INTEGER:
case DATE:
- case TIME_WITHOUT_TIME_ZONE:
case INTERVAL_YEAR_MONTH:
return Bytes::toInt;
+ case TIME_WITHOUT_TIME_ZONE:
+ final int timePrecision = getPrecision(fieldType);
+ if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
+ throw new UnsupportedOperationException(
+ String.format("The precision %s of TIME type is out of the range [%s, %s] supported by " +
+ "HBase connector", timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
+ }
+ return Bytes::toInt;
case BIGINT:
case INTERVAL_DAY_TIME:
return Bytes::toLong;
@@ -379,6 +404,12 @@ public class HBaseSerde {
return Bytes::toDouble;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int timestampPrecision = getPrecision(fieldType);
+ if (timestampPrecision < MIN_TIMESTAMP_PRECISION || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
+ throw new UnsupportedOperationException(
+ String.format("The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " +
+ "HBase connector", timestampPrecision, MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
return createTimestampDecoder();
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTypeUtils.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTypeUtils.java
index ec97597..0665e4b 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTypeUtils.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTypeUtils.java
@@ -42,6 +42,11 @@ public class HBaseTypeUtils {
private static final byte[] EMPTY_BYTES = new byte[]{};
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+ private static final int MAX_TIMESTAMP_PRECISION = 3;
+ private static final int MIN_TIME_PRECISION = 0;
+ private static final int MAX_TIME_PRECISION = 3;
+
/**
* Deserialize byte array to Java Object with the given type.
*/
@@ -184,16 +189,29 @@ public class HBaseTypeUtils {
case SMALLINT:
case INTEGER:
case DATE:
- case TIME_WITHOUT_TIME_ZONE:
case INTERVAL_YEAR_MONTH:
case BIGINT:
case INTERVAL_DAY_TIME:
case FLOAT:
case DOUBLE:
return true;
+ case TIME_WITHOUT_TIME_ZONE:
+ final int timePrecision = getPrecision(type);
+ if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
+ throw new UnsupportedOperationException(
+ String.format("The precision %s of TIME type is out of the range [%s, %s] supported by " +
+ "HBase connector", timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
+ }
+ return true;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return getPrecision(type) <= 3;
+ final int timestampPrecision = getPrecision(type);
+ if (timestampPrecision < MIN_TIMESTAMP_PRECISION || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
+ throw new UnsupportedOperationException(
+ String.format("The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by " +
+ "HBase connector", timestampPrecision, MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+ return true;
case TIMESTAMP_WITH_TIME_ZONE:
case ARRAY:
case MULTISET:
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
index d51f11b..061114e 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
@@ -261,6 +261,64 @@ public class HBaseDynamicTableFactoryTest {
}
}
+ @Test
+ public void testTypeWithUnsupportedPrecision() {
+ Map<String, String> options = getAllOptions();
+ // test unsupported timestamp precision
+ TableSchema schema = TableSchema.builder()
+ .field(ROWKEY, STRING())
+ .field(FAMILY1, ROW(
+ FIELD(COL1, TIMESTAMP(6)),
+ FIELD(COL2, INT())))
+ .build();
+ try {
+ createTableSource(schema, options);
+ fail("Should fail");
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils
+ .findThrowableWithMessage(e, "The precision 6 of TIMESTAMP type is out of the range [0, 3]" +
+ " supported by HBase connector")
+ .isPresent());
+ }
+
+ try {
+ createTableSink(schema, options);
+ fail("Should fail");
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils
+ .findThrowableWithMessage(e, "The precision 6 of TIMESTAMP type is out of the range [0, 3]" +
+ " supported by HBase connector")
+ .isPresent());
+ }
+ // test unsupported time precision
+ schema = TableSchema.builder()
+ .field(ROWKEY, STRING())
+ .field(FAMILY1, ROW(
+ FIELD(COL1, TIME(6)),
+ FIELD(COL2, INT())))
+ .build();
+
+ try {
+ createTableSource(schema, options);
+ fail("Should fail");
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils
+ .findThrowableWithMessage(e, "The precision 6 of TIME type is out of the range [0, 3]" +
+ " supported by HBase connector")
+ .isPresent());
+ }
+
+ try {
+ createTableSink(schema, options);
+ fail("Should fail");
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils
+ .findThrowableWithMessage(e, "The precision 6 of TIME type is out of the range [0, 3]" +
+ " supported by HBase connector")
+ .isPresent());
+ }
+ }
+
private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "hbase-1.4");