You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/20 16:24:48 UTC
[flink] branch master updated: [FLINK-25366][table-planner][table-runtime] Implement BINARY/VARBINARY length validation for sinks
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 19bc181 [FLINK-25366][table-planner][table-runtime] Implement BINARY/VARBINARY length validation for sinks
19bc181 is described below
commit 19bc18100802e8e5a56c5ce08e985d589db81838
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Fri Dec 17 15:57:05 2021 +0200
[FLINK-25366][table-planner][table-runtime] Implement BINARY/VARBINARY length validation for sinks
Similar to the length validation for CHAR/VARCHAR, implement the same logic for
BINARY/VARBINARY and apply any necessary trimming or padding to match the length
specified in the corresponding type.
This closes #18142.
---
.../generated/execution_config_configuration.html | 12 +-
.../table/api/config/ExecutionConfigOptions.java | 31 ++---
.../plan/nodes/exec/common/CommonExecSink.java | 75 ++++++++---
.../nodes/exec/common/CommonExecSinkITCase.java | 128 +++++++++++++++++-
.../runtime/operators/sink/ConstraintEnforcer.java | 146 +++++++++++++++------
5 files changed, 307 insertions(+), 85 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 099a9f9..3d3163d 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -53,12 +53,6 @@ By default no operator is disabled.</td>
<td>Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.</td>
</tr>
<tr>
- <td><h5>table.exec.sink.char-length-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
- <td style="word-wrap: break-word;">IGNORE</td>
- <td><p>Enum</p></td>
- <td>Determines whether string values for columns with CHAR(<length>)/VARCHAR(<length>) types will be trimmed or padded (only for CHAR(<length>)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR length directive.</li><li>"TRIM_PAD": Trim and pad string values to match the length defi [...]
- </tr>
- <tr>
<td><h5>table.exec.sink.keyed-shuffle</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">AUTO</td>
<td><p>Enum</p></td>
@@ -77,6 +71,12 @@ By default no operator is disabled.</td>
<td>Determines how Flink enforces NOT NULL column constraints when inserting null values.<br /><br />Possible values:<ul><li>"ERROR": Throw a runtime exception when writing null values into NOT NULL column.</li><li>"DROP": Drop records silently if a null value would have to be inserted into a NOT NULL column.</li></ul></td>
</tr>
<tr>
+ <td><h5>table.exec.sink.type-length-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+ <td style="word-wrap: break-word;">IGNORE</td>
+ <td><p>Enum</p></td>
+ <td>Determines whether values for columns with CHAR(<length>)/VARCHAR(<length>)/BINARY(<length>)/VARBINARY(<length>) types will be trimmed or padded (only for CHAR(<length>)/BINARY(<length>)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR/BINARY/VARBINARY column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR/BINARY/ [...]
+ </tr>
+ <tr>
<td><h5>table.exec.sink.upsert-materialize</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">AUTO</td>
<td><p>Enum</p></td>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index ba9a6c3..43a9b46 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -120,15 +120,16 @@ public class ExecutionConfigOptions {
"Determines how Flink enforces NOT NULL column constraints when inserting null values.");
@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
- public static final ConfigOption<CharLengthEnforcer> TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER =
- key("table.exec.sink.char-length-enforcer")
- .enumType(CharLengthEnforcer.class)
- .defaultValue(CharLengthEnforcer.IGNORE)
+ public static final ConfigOption<TypeLengthEnforcer> TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER =
+ key("table.exec.sink.type-length-enforcer")
+ .enumType(TypeLengthEnforcer.class)
+ .defaultValue(TypeLengthEnforcer.IGNORE)
.withDescription(
- "Determines whether string values for columns with CHAR(<length>)/VARCHAR(<length>) "
- + "types will be trimmed or padded (only for CHAR(<length>)), so that their "
- + "length will match the one defined by the length of their respective "
- + "CHAR/VARCHAR column type.");
+ "Determines whether values for columns with CHAR(<length>)/VARCHAR(<length>)"
+ + "/BINARY(<length>)/VARBINARY(<length>) types will be trimmed or padded "
+ + "(only for CHAR(<length>)/BINARY(<length>)), so that their length "
+ + "will match the one defined by the length of their respective "
+ + "CHAR/VARCHAR/BINARY/VARBINARY column type.");
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<UpsertMaterialize> TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
@@ -440,23 +441,23 @@ public class ExecutionConfigOptions {
}
/**
- * The enforcer to guarantee that length of CHAR/VARCHAR columns is respected when writing data
- * into sink.
+ * The enforcer to guarantee that length of CHAR/VARCHAR/BINARY/VARBINARY columns is respected
+ * when writing data into a sink.
*/
@PublicEvolving
- public enum CharLengthEnforcer implements DescribedEnum {
+ public enum TypeLengthEnforcer implements DescribedEnum {
IGNORE(
text(
"Don't apply any trimming and padding, and instead "
- + "ignore the CHAR/VARCHAR length directive.")),
+ + "ignore the CHAR/VARCHAR/BINARY/VARBINARY length directive.")),
TRIM_PAD(
text(
- "Trim and pad string values to match the length "
- + "defined by the CHAR/VARCHAR length."));
+ "Trim and pad string and binary values to match the length "
+ + "defined by the CHAR/VARCHAR/BINARY/VARBINARY length."));
private final InlineElement description;
- CharLengthEnforcer(InlineElement description) {
+ TypeLengthEnforcer(InlineElement description) {
this.description = description;
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index cb73eaf..25a0b2b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -66,11 +66,11 @@ import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInsert
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
+import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.types.RowKind;
@@ -211,20 +211,34 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
notNullEnforcer, notNullFieldIndices, notNullFieldNames, fieldNames);
}
+ final ExecutionConfigOptions.TypeLengthEnforcer typeLengthEnforcer =
+ config.getConfiguration()
+ .get(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER);
+
// Build CHAR/VARCHAR length enforcer
- final List<ConstraintEnforcer.CharFieldInfo> charFieldInfo =
- getCharFieldInfo(physicalRowType);
+ final List<ConstraintEnforcer.FieldInfo> charFieldInfo =
+ getFieldInfoForLengthEnforcer(physicalRowType, LengthEnforcerType.CHAR);
if (!charFieldInfo.isEmpty()) {
- final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer =
- config.getConfiguration()
- .get(ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER);
final List<String> charFieldNames =
charFieldInfo.stream()
.map(cfi -> fieldNames[cfi.fieldIdx()])
.collect(Collectors.toList());
validatorBuilder.addCharLengthConstraint(
- charLengthEnforcer, charFieldInfo, charFieldNames, fieldNames);
+ typeLengthEnforcer, charFieldInfo, charFieldNames, fieldNames);
+ }
+
+ // Build BINARY/VARBINARY length enforcer
+ final List<ConstraintEnforcer.FieldInfo> binaryFieldInfo =
+ getFieldInfoForLengthEnforcer(physicalRowType, LengthEnforcerType.BINARY);
+ if (!binaryFieldInfo.isEmpty()) {
+ final List<String> binaryFieldNames =
+ binaryFieldInfo.stream()
+ .map(cfi -> fieldNames[cfi.fieldIdx()])
+ .collect(Collectors.toList());
+
+ validatorBuilder.addBinaryLengthConstraint(
+ typeLengthEnforcer, binaryFieldInfo, binaryFieldNames, fieldNames);
}
ConstraintEnforcer constraintEnforcer = validatorBuilder.build();
@@ -257,26 +271,40 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
}
/**
- * Returns a List of {@link ConstraintEnforcer.CharFieldInfo}, each containing the info needed
- * to determine whether a string value needs trimming and/or padding.
+ * Returns a List of {@link ConstraintEnforcer.FieldInfo}, each containing the info needed to
+ * determine whether a string or binary value needs trimming and/or padding.
*/
- private List<ConstraintEnforcer.CharFieldInfo> getCharFieldInfo(RowType physicalType) {
- final List<ConstraintEnforcer.CharFieldInfo> charFieldsAndLengths = new ArrayList<>();
+ private List<ConstraintEnforcer.FieldInfo> getFieldInfoForLengthEnforcer(
+ RowType physicalType, LengthEnforcerType enforcerType) {
+ LogicalTypeRoot staticType = null;
+ LogicalTypeRoot variableType = null;
+ int maxLength = 0;
+ switch (enforcerType) {
+ case CHAR:
+ staticType = LogicalTypeRoot.CHAR;
+ variableType = LogicalTypeRoot.VARCHAR;
+ maxLength = CharType.MAX_LENGTH;
+ break;
+ case BINARY:
+ staticType = LogicalTypeRoot.BINARY;
+ variableType = LogicalTypeRoot.VARBINARY;
+ maxLength = BinaryType.MAX_LENGTH;
+ }
+ final List<ConstraintEnforcer.FieldInfo> fieldsAndLengths = new ArrayList<>();
for (int i = 0; i < physicalType.getFieldCount(); i++) {
LogicalType type = physicalType.getTypeAt(i);
- boolean isChar = type.is(LogicalTypeRoot.CHAR);
+ boolean isStatic = type.is(staticType);
// Should trim and possibly pad
- if ((isChar && (LogicalTypeChecks.getLength(type) < CharType.MAX_LENGTH))
- || (type.is(LogicalTypeRoot.VARCHAR)
- && (LogicalTypeChecks.getLength(type) < VarCharType.MAX_LENGTH))) {
- charFieldsAndLengths.add(
- new ConstraintEnforcer.CharFieldInfo(
- i, LogicalTypeChecks.getLength(type), isChar));
- } else if (isChar) { // Should pad
- charFieldsAndLengths.add(new ConstraintEnforcer.CharFieldInfo(i, null, isChar));
+ if ((isStatic && (LogicalTypeChecks.getLength(type) < maxLength))
+ || (type.is(variableType) && (LogicalTypeChecks.getLength(type) < maxLength))) {
+ fieldsAndLengths.add(
+ new ConstraintEnforcer.FieldInfo(
+ i, LogicalTypeChecks.getLength(type), isStatic));
+ } else if (isStatic) { // Should pad
+ fieldsAndLengths.add(new ConstraintEnforcer.FieldInfo(i, null, isStatic));
}
}
- return charFieldsAndLengths;
+ return fieldsAndLengths;
}
/**
@@ -514,4 +542,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
private RowType getPhysicalRowType(ResolvedSchema schema) {
return (RowType) schema.toPhysicalRowDataType().getLogicalType();
}
+
+ private enum LengthEnforcerType {
+ CHAR,
+ BINARY
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index 8714722..5ee26ae 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -58,8 +58,8 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.DataTypes.INT;
-import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
@@ -218,13 +218,14 @@ public class CommonExecSinkITCase extends AbstractTestBase {
result.collect().forEachRemaining(results::add);
assertThat(results, containsInAnyOrder(rows.toArray()));
- // Change config option to "trim", to trim the strings based on their type length
+ // Change config option to "trim_pad", to trim or pad the strings
+ // accordingly, based on their type length
try {
tableEnv.getConfig()
.getConfiguration()
.setString(
- TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(),
- ExecutionConfigOptions.CharLengthEnforcer.TRIM_PAD.name());
+ TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
+ ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
result = tableEnv.executeSql("SELECT * FROM T1");
result.await();
@@ -243,8 +244,112 @@ public class CommonExecSinkITCase extends AbstractTestBase {
tableEnv.getConfig()
.getConfiguration()
.setString(
- TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(),
- ExecutionConfigOptions.CharLengthEnforcer.IGNORE.name());
+ TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
+ ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
+ }
+ }
+
+ @Test
+ public void testBinaryLengthEnforcer() throws ExecutionException, InterruptedException {
+ final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+ final List<Row> rows =
+ Arrays.asList(
+ Row.of(
+ 1,
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8},
+ 11,
+ 111,
+ new byte[] {1, 2, 3}),
+ Row.of(
+ 2,
+ new byte[] {1, 2, 3, 4, 5},
+ new byte[] {1, 2, 3},
+ 22,
+ 222,
+ new byte[] {1, 2, 3, 4, 5, 6}),
+ Row.of(
+ 3,
+ new byte[] {1, 2, 3, 4, 5, 6},
+ new byte[] {1, 2, 3, 4, 5},
+ 33,
+ 333,
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8}),
+ Row.of(
+ 4,
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8},
+ new byte[] {1, 2, 3, 4, 5, 6},
+ 44,
+ 444,
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
+
+ final TableDescriptor sourceDescriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(schemaForBinaryLengthEnforcer())
+ .source(new TestSource(rows))
+ .build();
+ tableEnv.createTable("T1", sourceDescriptor);
+
+ // Default config - ignore (no trim)
+ TableResult result = tableEnv.executeSql("SELECT * FROM T1");
+ result.await();
+
+ final List<Row> results = new ArrayList<>();
+ result.collect().forEachRemaining(results::add);
+ assertThat(results, containsInAnyOrder(rows.toArray()));
+
+ // Change config option to "trim_pad", to trim or pad the strings
+ // accordingly, based on their type length
+ try {
+ tableEnv.getConfig()
+ .getConfiguration()
+ .setString(
+ TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
+ ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
+
+ result = tableEnv.executeSql("SELECT * FROM T1");
+ result.await();
+
+ final List<Row> expected =
+ Arrays.asList(
+ Row.of(
+ 1,
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8},
+ new byte[] {1, 2, 3, 4, 5, 6},
+ 11,
+ 111,
+ new byte[] {1, 2, 3}),
+ Row.of(
+ 2,
+ new byte[] {1, 2, 3, 4, 5, 0, 0, 0},
+ new byte[] {1, 2, 3, 0, 0, 0},
+ 22,
+ 222,
+ new byte[] {1, 2, 3, 4, 5, 6}),
+ Row.of(
+ 3,
+ new byte[] {1, 2, 3, 4, 5, 6, 0, 0},
+ new byte[] {1, 2, 3, 4, 5, 0},
+ 33,
+ 333,
+ new byte[] {1, 2, 3, 4, 5, 6}),
+ Row.of(
+ 4,
+ new byte[] {1, 2, 3, 4, 5, 6, 7, 8},
+ new byte[] {1, 2, 3, 4, 5, 6},
+ 44,
+ 444,
+ new byte[] {1, 2, 3, 4, 5, 6}));
+ final List<Row> resultsTrimmed = new ArrayList<>();
+ result.collect().forEachRemaining(resultsTrimmed::add);
+ assertThat(resultsTrimmed, containsInAnyOrder(expected.toArray()));
+
+ } finally {
+ tableEnv.getConfig()
+ .getConfiguration()
+ .setString(
+ TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
+ ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
}
}
@@ -389,6 +494,17 @@ public class CommonExecSinkITCase extends AbstractTestBase {
.build();
}
+ private static Schema schemaForBinaryLengthEnforcer() {
+ return Schema.newBuilder()
+ .column("a", "INT")
+ .column("b", "BINARY(8)")
+ .column("c", "BINARY(6)")
+ .column("d", "INT")
+ .column("e", "INT")
+ .column("f", "VARBINARY(6)")
+ .build();
+ }
+
private static Schema schemaForNotNullEnforcer() {
return Schema.newBuilder()
.column("a", "INT")
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
index 7822933..35c9888 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -35,11 +34,12 @@ import org.apache.flink.table.runtime.util.StreamRecordCollector;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
-import static org.apache.flink.table.api.config.ExecutionConfigOptions.CharLengthEnforcer;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TypeLengthEnforcer;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
@@ -61,10 +61,13 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
private final int[] notNullFieldIndices;
private final String[] allFieldNames;
- private final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer;
+ private final TypeLengthEnforcer typeLengthEnforcer;
private final int[] charFieldIndices;
private final int[] charFieldLengths;
- private final BitSet charFieldShouldPad;
+ private final BitSet charFieldCouldPad;
+ private final int[] binaryFieldIndices;
+ private final int[] binaryFieldLengths;
+ private final BitSet binaryFieldCouldPad;
private final String operatorName;
@@ -73,18 +76,24 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
private ConstraintEnforcer(
NotNullEnforcer notNullEnforcer,
int[] notNullFieldIndices,
- ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer,
+ TypeLengthEnforcer typeLengthEnforcer,
int[] charFieldIndices,
int[] charFieldLengths,
- BitSet charFieldShouldPad,
+ BitSet charFieldCouldPad,
+ int[] binaryFieldIndices,
+ int[] binaryFieldLengths,
+ BitSet binaryFieldCouldPad,
String[] allFieldNames,
String operatorName) {
this.notNullEnforcer = notNullEnforcer;
this.notNullFieldIndices = notNullFieldIndices;
- this.charLengthEnforcer = charLengthEnforcer;
+ this.typeLengthEnforcer = typeLengthEnforcer;
this.charFieldIndices = charFieldIndices;
this.charFieldLengths = charFieldLengths;
- this.charFieldShouldPad = charFieldShouldPad;
+ this.charFieldCouldPad = charFieldCouldPad;
+ this.binaryFieldIndices = binaryFieldIndices;
+ this.binaryFieldLengths = binaryFieldLengths;
+ this.binaryFieldCouldPad = binaryFieldCouldPad;
this.allFieldNames = allFieldNames;
this.operatorName = operatorName;
}
@@ -106,15 +115,17 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
/**
* Helper builder, so that the {@link ConstraintEnforcer} can be instantiated with only the NOT
- * NULL constraint validation, only the CHAR/VARCHAR length validation, or both.
+ * NULL constraint validation, only the CHAR/VARCHAR length validation, only the
+ * BINARY/VARBINARY length validation or combinations of them, or all of them.
*/
public static class Builder {
private NotNullEnforcer notNullEnforcer;
private int[] notNullFieldIndices;
- private CharLengthEnforcer charLengthEnforcer;
- private List<CharFieldInfo> charFieldInfo;
+ private TypeLengthEnforcer typeLengthEnforcer;
+ private List<FieldInfo> charFieldInfo;
+ private List<FieldInfo> binaryFieldInfo;
private String[] allFieldNames;
private final List<String> operatorNames = new ArrayList<>();
@@ -142,12 +153,12 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
}
public void addCharLengthConstraint(
- ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer,
- List<CharFieldInfo> charFieldInfo,
+ TypeLengthEnforcer typeLengthEnforcer,
+ List<FieldInfo> charFieldInfo,
List<String> charFieldNames,
String[] allFieldNames) {
- this.charLengthEnforcer = charLengthEnforcer;
- if (this.charLengthEnforcer == CharLengthEnforcer.TRIM_PAD) {
+ this.typeLengthEnforcer = typeLengthEnforcer;
+ if (this.typeLengthEnforcer == TypeLengthEnforcer.TRIM_PAD) {
checkArgument(
charFieldInfo.size() > 0,
"ConstraintValidator requires that there are CHAR/VARCHAR fields.");
@@ -156,14 +167,35 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
operatorNames.add(
String.format(
- "CharLengthEnforcer(fields=[%s])",
- String.join(", ", charFieldNames)));
+ "LengthEnforcer(fields=[%s])", String.join(", ", charFieldNames)));
+ this.isConfigured = true;
+ }
+ }
+
+ public void addBinaryLengthConstraint(
+ TypeLengthEnforcer typeLengthEnforcer,
+ List<FieldInfo> binaryFieldInfo,
+ List<String> binaryFieldNames,
+ String[] allFieldNames) {
+ this.typeLengthEnforcer = typeLengthEnforcer;
+ if (this.typeLengthEnforcer == TypeLengthEnforcer.TRIM_PAD) {
+ checkArgument(
+ binaryFieldInfo.size() > 0,
+ "ConstraintValidator requires that there are BINARY/VARBINARY fields.");
+ this.binaryFieldInfo = binaryFieldInfo;
+ this.allFieldNames = allFieldNames;
+
+ operatorNames.add(
+ String.format(
+ "LengthEnforcer(fields=[%s])",
+ String.join(", ", binaryFieldNames)));
this.isConfigured = true;
}
}
/**
- * If neither of NOT NULL or CHAR/VARCHAR length enforcers are configured, null is returned.
+ * If neither of NOT NULL or CHAR/VARCHAR length or BINARY/VARBINARY enforcers are
+ * configured, null is returned.
*/
public ConstraintEnforcer build() {
if (isConfigured) {
@@ -172,14 +204,21 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
return new ConstraintEnforcer(
notNullEnforcer,
notNullFieldIndices,
- charLengthEnforcer,
+ typeLengthEnforcer,
charFieldInfo != null
- ? charFieldInfo.stream().mapToInt(cfi -> cfi.fieldIdx).toArray()
+ ? charFieldInfo.stream().mapToInt(fi -> fi.fieldIdx).toArray()
: null,
charFieldInfo != null
- ? charFieldInfo.stream().mapToInt(cfi -> cfi.length).toArray()
+ ? charFieldInfo.stream().mapToInt(fi -> fi.length).toArray()
+ : null,
+ charFieldInfo != null ? buildCouldPad(charFieldInfo) : null,
+ binaryFieldInfo != null
+ ? binaryFieldInfo.stream().mapToInt(fi -> fi.fieldIdx).toArray()
: null,
- charFieldInfo != null ? buildShouldPad(charFieldInfo) : null,
+ binaryFieldInfo != null
+ ? binaryFieldInfo.stream().mapToInt(fi -> fi.length).toArray()
+ : null,
+ binaryFieldInfo != null ? buildCouldPad(binaryFieldInfo) : null,
allFieldNames,
operatorName);
}
@@ -187,21 +226,25 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
}
}
- private static BitSet buildShouldPad(List<CharFieldInfo> charFieldInfo) {
- BitSet shouldPad = new BitSet(charFieldInfo.size());
+ private static BitSet buildCouldPad(List<FieldInfo> charFieldInfo) {
+ BitSet couldPad = new BitSet(charFieldInfo.size());
for (int i = 0; i < charFieldInfo.size(); i++) {
- if (charFieldInfo.get(i).shouldPad) {
- shouldPad.set(i);
+ if (charFieldInfo.get(i).couldPad) {
+ couldPad.set(i);
}
}
- return shouldPad;
+ return couldPad;
}
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
RowData processedRowData = processNotNullConstraint(element.getValue());
+ // If NOT NULL constraint is not respected don't proceed to process the other constraints,
+ // simply drop the record.
if (processedRowData != null) {
- collector.collect(processCharConstraint(processedRowData));
+ processedRowData = processCharConstraint(processedRowData);
+ processedRowData = processBinaryConstraint(processedRowData);
+ collector.collect(processedRowData);
}
}
@@ -231,8 +274,9 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
}
private RowData processCharConstraint(RowData rowData) {
- if (charLengthEnforcer == null
- || charLengthEnforcer == ExecutionConfigOptions.CharLengthEnforcer.IGNORE) {
+ if (typeLengthEnforcer == null
+ || typeLengthEnforcer == TypeLengthEnforcer.IGNORE
+ || charFieldIndices == null) {
return rowData;
}
@@ -244,7 +288,7 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
final BinaryStringData stringData = (BinaryStringData) rowData.getString(fieldIdx);
final int sourceStrLength = stringData.numChars();
- if (charFieldShouldPad.get(i) && sourceStrLength < length) {
+ if (charFieldCouldPad.get(i) && sourceStrLength < length) {
if (updatedRowData == null) {
updatedRowData = new UpdatableRowData(rowData, allFieldNames.length);
}
@@ -266,20 +310,48 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
return updatedRowData != null ? updatedRowData : rowData;
}
+ private RowData processBinaryConstraint(RowData rowData) {
+ if (typeLengthEnforcer == null
+ || typeLengthEnforcer == TypeLengthEnforcer.IGNORE
+ || binaryFieldIndices == null) {
+ return rowData;
+ }
+
+ UpdatableRowData updatedRowData = null;
+
+ for (int i = 0; i < binaryFieldLengths.length; i++) {
+ final int fieldIdx = binaryFieldIndices[i];
+ final int length = binaryFieldLengths[i];
+ final byte[] binaryData = rowData.getBinary(fieldIdx);
+ final int sourceLength = binaryData.length;
+
+ // Trimming takes places because of the shorter length used in `Arrays.copyOf` and
+ // padding because of the longer length, as implicitly the trailing bytes are 0.
+ if ((sourceLength > length) || (binaryFieldCouldPad.get(i) && sourceLength < length)) {
+ if (updatedRowData == null) {
+ updatedRowData = new UpdatableRowData(rowData, allFieldNames.length);
+ }
+ updatedRowData.setField(fieldIdx, Arrays.copyOf(binaryData, length));
+ }
+ }
+
+ return updatedRowData != null ? updatedRowData : rowData;
+ }
+
/**
- * Helper POJO to keep info about CHAR/VARCHAR Fields, used to determine if trimming or padding
- * is needed.
+ * Helper POJO to keep info about CHAR/VARCHAR/BINARY/VARBINARY fields, used to determine if
+ * trimming or padding is needed.
*/
@Internal
- public static class CharFieldInfo {
+ public static class FieldInfo {
private final int fieldIdx;
private final Integer length;
- private final boolean shouldPad;
+ private final boolean couldPad;
- public CharFieldInfo(int fieldIdx, @Nullable Integer length, boolean shouldPad) {
+ public FieldInfo(int fieldIdx, @Nullable Integer length, boolean couldPad) {
this.fieldIdx = fieldIdx;
this.length = length;
- this.shouldPad = shouldPad;
+ this.couldPad = couldPad;
}
public int fieldIdx() {