You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/20 16:24:48 UTC

[flink] branch master updated: [FLINK-25366][table-planner][table-runtime] Implement BINARY/VARBINARY length validation for sinks

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 19bc181  [FLINK-25366][table-planner][table-runtime] Implement BINARY/VARBINARY length validation for sinks
19bc181 is described below

commit 19bc18100802e8e5a56c5ce08e985d589db81838
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Fri Dec 17 15:57:05 2021 +0200

    [FLINK-25366][table-planner][table-runtime] Implement BINARY/VARBINARY length validation for sinks
    
    Similar to the length validation for CHAR/VARCHAR, implement the same logic for
    BINARY/VARBINARY and apply any necessary trimming or padding to match the length
    specified in the corresponding type.
    
    This closes #18142.
---
 .../generated/execution_config_configuration.html  |  12 +-
 .../table/api/config/ExecutionConfigOptions.java   |  31 ++---
 .../plan/nodes/exec/common/CommonExecSink.java     |  75 ++++++++---
 .../nodes/exec/common/CommonExecSinkITCase.java    | 128 +++++++++++++++++-
 .../runtime/operators/sink/ConstraintEnforcer.java | 146 +++++++++++++++------
 5 files changed, 307 insertions(+), 85 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 099a9f9..3d3163d 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -53,12 +53,6 @@ By default no operator is disabled.</td>
             <td>Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.</td>
         </tr>
         <tr>
-            <td><h5>table.exec.sink.char-length-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
-            <td style="word-wrap: break-word;">IGNORE</td>
-            <td><p>Enum</p></td>
-            <td>Determines whether string values for columns with CHAR(&lt;length&gt;)/VARCHAR(&lt;length&gt;) types will be trimmed or padded (only for CHAR(&lt;length&gt;)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR length directive.</li><li>"TRIM_PAD": Trim and pad string values to match the length defi [...]
-        </tr>
-        <tr>
             <td><h5>table.exec.sink.keyed-shuffle</h5><br> <span class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">AUTO</td>
             <td><p>Enum</p></td>
@@ -77,6 +71,12 @@ By default no operator is disabled.</td>
             <td>Determines how Flink enforces NOT NULL column constraints when inserting null values.<br /><br />Possible values:<ul><li>"ERROR": Throw a runtime exception when writing null values into NOT NULL column.</li><li>"DROP": Drop records silently if a null value would have to be inserted into a NOT NULL column.</li></ul></td>
         </tr>
         <tr>
+            <td><h5>table.exec.sink.type-length-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">IGNORE</td>
+            <td><p>Enum</p></td>
+            <td>Determines whether values for columns with CHAR(&lt;length&gt;)/VARCHAR(&lt;length&gt;)/BINARY(&lt;length&gt;)/VARBINARY(&lt;length&gt;) types will be trimmed or padded (only for CHAR(&lt;length&gt;)/BINARY(&lt;length&gt;)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR/BINARY/VARBINARY column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR/BINARY/ [...]
+        </tr>
+        <tr>
             <td><h5>table.exec.sink.upsert-materialize</h5><br> <span class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">AUTO</td>
             <td><p>Enum</p></td>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index ba9a6c3..43a9b46 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -120,15 +120,16 @@ public class ExecutionConfigOptions {
                             "Determines how Flink enforces NOT NULL column constraints when inserting null values.");
 
     @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
-    public static final ConfigOption<CharLengthEnforcer> TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER =
-            key("table.exec.sink.char-length-enforcer")
-                    .enumType(CharLengthEnforcer.class)
-                    .defaultValue(CharLengthEnforcer.IGNORE)
+    public static final ConfigOption<TypeLengthEnforcer> TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER =
+            key("table.exec.sink.type-length-enforcer")
+                    .enumType(TypeLengthEnforcer.class)
+                    .defaultValue(TypeLengthEnforcer.IGNORE)
                     .withDescription(
-                            "Determines whether string values for columns with CHAR(<length>)/VARCHAR(<length>) "
-                                    + "types will be trimmed or padded (only for CHAR(<length>)), so that their "
-                                    + "length will match the one defined by the length of their respective "
-                                    + "CHAR/VARCHAR column type.");
+                            "Determines whether values for columns with CHAR(<length>)/VARCHAR(<length>)"
+                                    + "/BINARY(<length>)/VARBINARY(<length>) types will be trimmed or padded "
+                                    + "(only for CHAR(<length>)/BINARY(<length>)), so that their length "
+                                    + "will match the one defined by the length of their respective "
+                                    + "CHAR/VARCHAR/BINARY/VARBINARY column type.");
 
     @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
     public static final ConfigOption<UpsertMaterialize> TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
@@ -440,23 +441,23 @@ public class ExecutionConfigOptions {
     }
 
     /**
-     * The enforcer to guarantee that length of CHAR/VARCHAR columns is respected when writing data
-     * into sink.
+     * The enforcer to guarantee that length of CHAR/VARCHAR/BINARY/VARBINARY columns is respected
+     * when writing data into a sink.
      */
     @PublicEvolving
-    public enum CharLengthEnforcer implements DescribedEnum {
+    public enum TypeLengthEnforcer implements DescribedEnum {
         IGNORE(
                 text(
                         "Don't apply any trimming and padding, and instead "
-                                + "ignore the CHAR/VARCHAR length directive.")),
+                                + "ignore the CHAR/VARCHAR/BINARY/VARBINARY length directive.")),
         TRIM_PAD(
                 text(
-                        "Trim and pad string values to match the length "
-                                + "defined by the CHAR/VARCHAR length."));
+                        "Trim and pad string and binary values to match the length "
+                                + "defined by the CHAR/VARCHAR/BINARY/VARBINARY length."));
 
         private final InlineElement description;
 
-        CharLengthEnforcer(InlineElement description) {
+        TypeLengthEnforcer(InlineElement description) {
             this.description = description;
         }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index cb73eaf..25a0b2b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -66,11 +66,11 @@ import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInsert
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.StateConfigUtil;
+import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.types.RowKind;
 
@@ -211,20 +211,34 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
                     notNullEnforcer, notNullFieldIndices, notNullFieldNames, fieldNames);
         }
 
+        final ExecutionConfigOptions.TypeLengthEnforcer typeLengthEnforcer =
+                config.getConfiguration()
+                        .get(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER);
+
         // Build CHAR/VARCHAR length enforcer
-        final List<ConstraintEnforcer.CharFieldInfo> charFieldInfo =
-                getCharFieldInfo(physicalRowType);
+        final List<ConstraintEnforcer.FieldInfo> charFieldInfo =
+                getFieldInfoForLengthEnforcer(physicalRowType, LengthEnforcerType.CHAR);
         if (!charFieldInfo.isEmpty()) {
-            final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer =
-                    config.getConfiguration()
-                            .get(ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER);
             final List<String> charFieldNames =
                     charFieldInfo.stream()
                             .map(cfi -> fieldNames[cfi.fieldIdx()])
                             .collect(Collectors.toList());
 
             validatorBuilder.addCharLengthConstraint(
-                    charLengthEnforcer, charFieldInfo, charFieldNames, fieldNames);
+                    typeLengthEnforcer, charFieldInfo, charFieldNames, fieldNames);
+        }
+
+        // Build BINARY/VARBINARY length enforcer
+        final List<ConstraintEnforcer.FieldInfo> binaryFieldInfo =
+                getFieldInfoForLengthEnforcer(physicalRowType, LengthEnforcerType.BINARY);
+        if (!binaryFieldInfo.isEmpty()) {
+            final List<String> binaryFieldNames =
+                    binaryFieldInfo.stream()
+                            .map(cfi -> fieldNames[cfi.fieldIdx()])
+                            .collect(Collectors.toList());
+
+            validatorBuilder.addBinaryLengthConstraint(
+                    typeLengthEnforcer, binaryFieldInfo, binaryFieldNames, fieldNames);
         }
 
         ConstraintEnforcer constraintEnforcer = validatorBuilder.build();
@@ -257,26 +271,40 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
     }
 
     /**
-     * Returns a List of {@link ConstraintEnforcer.CharFieldInfo}, each containing the info needed
-     * to determine whether a string value needs trimming and/or padding.
+     * Returns a List of {@link ConstraintEnforcer.FieldInfo}, each containing the info needed to
+     * determine whether a string or binary value needs trimming and/or padding.
      */
-    private List<ConstraintEnforcer.CharFieldInfo> getCharFieldInfo(RowType physicalType) {
-        final List<ConstraintEnforcer.CharFieldInfo> charFieldsAndLengths = new ArrayList<>();
+    private List<ConstraintEnforcer.FieldInfo> getFieldInfoForLengthEnforcer(
+            RowType physicalType, LengthEnforcerType enforcerType) {
+        LogicalTypeRoot staticType = null;
+        LogicalTypeRoot variableType = null;
+        int maxLength = 0;
+        switch (enforcerType) {
+            case CHAR:
+                staticType = LogicalTypeRoot.CHAR;
+                variableType = LogicalTypeRoot.VARCHAR;
+                maxLength = CharType.MAX_LENGTH;
+                break;
+            case BINARY:
+                staticType = LogicalTypeRoot.BINARY;
+                variableType = LogicalTypeRoot.VARBINARY;
+                maxLength = BinaryType.MAX_LENGTH;
+        }
+        final List<ConstraintEnforcer.FieldInfo> fieldsAndLengths = new ArrayList<>();
         for (int i = 0; i < physicalType.getFieldCount(); i++) {
             LogicalType type = physicalType.getTypeAt(i);
-            boolean isChar = type.is(LogicalTypeRoot.CHAR);
+            boolean isStatic = type.is(staticType);
             // Should trim and possibly pad
-            if ((isChar && (LogicalTypeChecks.getLength(type) < CharType.MAX_LENGTH))
-                    || (type.is(LogicalTypeRoot.VARCHAR)
-                            && (LogicalTypeChecks.getLength(type) < VarCharType.MAX_LENGTH))) {
-                charFieldsAndLengths.add(
-                        new ConstraintEnforcer.CharFieldInfo(
-                                i, LogicalTypeChecks.getLength(type), isChar));
-            } else if (isChar) { // Should pad
-                charFieldsAndLengths.add(new ConstraintEnforcer.CharFieldInfo(i, null, isChar));
+            if ((isStatic && (LogicalTypeChecks.getLength(type) < maxLength))
+                    || (type.is(variableType) && (LogicalTypeChecks.getLength(type) < maxLength))) {
+                fieldsAndLengths.add(
+                        new ConstraintEnforcer.FieldInfo(
+                                i, LogicalTypeChecks.getLength(type), isStatic));
+            } else if (isStatic) { // Should pad
+                fieldsAndLengths.add(new ConstraintEnforcer.FieldInfo(i, null, isStatic));
             }
         }
-        return charFieldsAndLengths;
+        return fieldsAndLengths;
     }
 
     /**
@@ -514,4 +542,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
     private RowType getPhysicalRowType(ResolvedSchema schema) {
         return (RowType) schema.toPhysicalRowDataType().getLogicalType();
     }
+
+    private enum LengthEnforcerType {
+        CHAR,
+        BINARY
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index 8714722..5ee26ae 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -58,8 +58,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.DataTypes.INT;
-import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
@@ -218,13 +218,14 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         result.collect().forEachRemaining(results::add);
         assertThat(results, containsInAnyOrder(rows.toArray()));
 
-        // Change config option to "trim", to trim the strings based on their type length
+        // Change config option to "trim_pad", to trim or pad the strings
+        // accordingly, based on their type length
         try {
             tableEnv.getConfig()
                     .getConfiguration()
                     .setString(
-                            TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(),
-                            ExecutionConfigOptions.CharLengthEnforcer.TRIM_PAD.name());
+                            TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
+                            ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
 
             result = tableEnv.executeSql("SELECT * FROM T1");
             result.await();
@@ -243,8 +244,112 @@ public class CommonExecSinkITCase extends AbstractTestBase {
             tableEnv.getConfig()
                     .getConfiguration()
                     .setString(
-                            TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(),
-                            ExecutionConfigOptions.CharLengthEnforcer.IGNORE.name());
+                            TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
+                            ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
+        }
+    }
+
+    @Test
+    public void testBinaryLengthEnforcer() throws ExecutionException, InterruptedException {
+        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(
+                                1,
+                                new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
+                                new byte[] {1, 2, 3, 4, 5, 6, 7, 8},
+                                11,
+                                111,
+                                new byte[] {1, 2, 3}),
+                        Row.of(
+                                2,
+                                new byte[] {1, 2, 3, 4, 5},
+                                new byte[] {1, 2, 3},
+                                22,
+                                222,
+                                new byte[] {1, 2, 3, 4, 5, 6}),
+                        Row.of(
+                                3,
+                                new byte[] {1, 2, 3, 4, 5, 6},
+                                new byte[] {1, 2, 3, 4, 5},
+                                33,
+                                333,
+                                new byte[] {1, 2, 3, 4, 5, 6, 7, 8}),
+                        Row.of(
+                                4,
+                                new byte[] {1, 2, 3, 4, 5, 6, 7, 8},
+                                new byte[] {1, 2, 3, 4, 5, 6},
+                                44,
+                                444,
+                                new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForBinaryLengthEnforcer())
+                        .source(new TestSource(rows))
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+
+        // Default config - ignore (no trim)
+        TableResult result = tableEnv.executeSql("SELECT * FROM T1");
+        result.await();
+
+        final List<Row> results = new ArrayList<>();
+        result.collect().forEachRemaining(results::add);
+        assertThat(results, containsInAnyOrder(rows.toArray()));
+
+        // Change config option to "trim_pad", to trim or pad the strings
+        // accordingly, based on their type length
+        try {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
+                            ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
+
+            result = tableEnv.executeSql("SELECT * FROM T1");
+            result.await();
+
+            final List<Row> expected =
+                    Arrays.asList(
+                            Row.of(
+                                    1,
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8},
+                                    new byte[] {1, 2, 3, 4, 5, 6},
+                                    11,
+                                    111,
+                                    new byte[] {1, 2, 3}),
+                            Row.of(
+                                    2,
+                                    new byte[] {1, 2, 3, 4, 5, 0, 0, 0},
+                                    new byte[] {1, 2, 3, 0, 0, 0},
+                                    22,
+                                    222,
+                                    new byte[] {1, 2, 3, 4, 5, 6}),
+                            Row.of(
+                                    3,
+                                    new byte[] {1, 2, 3, 4, 5, 6, 0, 0},
+                                    new byte[] {1, 2, 3, 4, 5, 0},
+                                    33,
+                                    333,
+                                    new byte[] {1, 2, 3, 4, 5, 6}),
+                            Row.of(
+                                    4,
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8},
+                                    new byte[] {1, 2, 3, 4, 5, 6},
+                                    44,
+                                    444,
+                                    new byte[] {1, 2, 3, 4, 5, 6}));
+            final List<Row> resultsTrimmed = new ArrayList<>();
+            result.collect().forEachRemaining(resultsTrimmed::add);
+            assertThat(resultsTrimmed, containsInAnyOrder(expected.toArray()));
+
+        } finally {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(),
+                            ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
         }
     }
 
@@ -389,6 +494,17 @@ public class CommonExecSinkITCase extends AbstractTestBase {
                 .build();
     }
 
+    private static Schema schemaForBinaryLengthEnforcer() {
+        return Schema.newBuilder()
+                .column("a", "INT")
+                .column("b", "BINARY(8)")
+                .column("c", "BINARY(6)")
+                .column("d", "INT")
+                .column("e", "INT")
+                .column("f", "VARBINARY(6)")
+                .build();
+    }
+
     private static Schema schemaForNotNullEnforcer() {
         return Schema.newBuilder()
                 .column("a", "INT")
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
index 7822933..35c9888 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -35,11 +34,12 @@ import org.apache.flink.table.runtime.util.StreamRecordCollector;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 
-import static org.apache.flink.table.api.config.ExecutionConfigOptions.CharLengthEnforcer;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TypeLengthEnforcer;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -61,10 +61,13 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
     private final int[] notNullFieldIndices;
     private final String[] allFieldNames;
 
-    private final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer;
+    private final TypeLengthEnforcer typeLengthEnforcer;
     private final int[] charFieldIndices;
     private final int[] charFieldLengths;
-    private final BitSet charFieldShouldPad;
+    private final BitSet charFieldCouldPad;
+    private final int[] binaryFieldIndices;
+    private final int[] binaryFieldLengths;
+    private final BitSet binaryFieldCouldPad;
 
     private final String operatorName;
 
@@ -73,18 +76,24 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
     private ConstraintEnforcer(
             NotNullEnforcer notNullEnforcer,
             int[] notNullFieldIndices,
-            ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer,
+            TypeLengthEnforcer typeLengthEnforcer,
             int[] charFieldIndices,
             int[] charFieldLengths,
-            BitSet charFieldShouldPad,
+            BitSet charFieldCouldPad,
+            int[] binaryFieldIndices,
+            int[] binaryFieldLengths,
+            BitSet binaryFieldCouldPad,
             String[] allFieldNames,
             String operatorName) {
         this.notNullEnforcer = notNullEnforcer;
         this.notNullFieldIndices = notNullFieldIndices;
-        this.charLengthEnforcer = charLengthEnforcer;
+        this.typeLengthEnforcer = typeLengthEnforcer;
         this.charFieldIndices = charFieldIndices;
         this.charFieldLengths = charFieldLengths;
-        this.charFieldShouldPad = charFieldShouldPad;
+        this.charFieldCouldPad = charFieldCouldPad;
+        this.binaryFieldIndices = binaryFieldIndices;
+        this.binaryFieldLengths = binaryFieldLengths;
+        this.binaryFieldCouldPad = binaryFieldCouldPad;
         this.allFieldNames = allFieldNames;
         this.operatorName = operatorName;
     }
@@ -106,15 +115,17 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
 
     /**
      * Helper builder, so that the {@link ConstraintEnforcer} can be instantiated with only the NOT
-     * NULL constraint validation, only the CHAR/VARCHAR length validation, or both.
+     * NULL constraint validation, only the CHAR/VARCHAR length validation, only the
+     * BINARY/VARBINARY length validation or combinations of them, or all of them.
      */
     public static class Builder {
 
         private NotNullEnforcer notNullEnforcer;
         private int[] notNullFieldIndices;
 
-        private CharLengthEnforcer charLengthEnforcer;
-        private List<CharFieldInfo> charFieldInfo;
+        private TypeLengthEnforcer typeLengthEnforcer;
+        private List<FieldInfo> charFieldInfo;
+        private List<FieldInfo> binaryFieldInfo;
         private String[] allFieldNames;
 
         private final List<String> operatorNames = new ArrayList<>();
@@ -142,12 +153,12 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
         }
 
         public void addCharLengthConstraint(
-                ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer,
-                List<CharFieldInfo> charFieldInfo,
+                TypeLengthEnforcer typeLengthEnforcer,
+                List<FieldInfo> charFieldInfo,
                 List<String> charFieldNames,
                 String[] allFieldNames) {
-            this.charLengthEnforcer = charLengthEnforcer;
-            if (this.charLengthEnforcer == CharLengthEnforcer.TRIM_PAD) {
+            this.typeLengthEnforcer = typeLengthEnforcer;
+            if (this.typeLengthEnforcer == TypeLengthEnforcer.TRIM_PAD) {
                 checkArgument(
                         charFieldInfo.size() > 0,
                         "ConstraintValidator requires that there are CHAR/VARCHAR fields.");
@@ -156,14 +167,35 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
 
                 operatorNames.add(
                         String.format(
-                                "CharLengthEnforcer(fields=[%s])",
-                                String.join(", ", charFieldNames)));
+                                "LengthEnforcer(fields=[%s])", String.join(", ", charFieldNames)));
+                this.isConfigured = true;
+            }
+        }
+
+        public void addBinaryLengthConstraint(
+                TypeLengthEnforcer typeLengthEnforcer,
+                List<FieldInfo> binaryFieldInfo,
+                List<String> binaryFieldNames,
+                String[] allFieldNames) {
+            this.typeLengthEnforcer = typeLengthEnforcer;
+            if (this.typeLengthEnforcer == TypeLengthEnforcer.TRIM_PAD) {
+                checkArgument(
+                        binaryFieldInfo.size() > 0,
+                        "ConstraintValidator requires that there are BINARY/VARBINARY fields.");
+                this.binaryFieldInfo = binaryFieldInfo;
+                this.allFieldNames = allFieldNames;
+
+                operatorNames.add(
+                        String.format(
+                                "LengthEnforcer(fields=[%s])",
+                                String.join(", ", binaryFieldNames)));
                 this.isConfigured = true;
             }
         }
 
         /**
-         * If neither of NOT NULL or CHAR/VARCHAR length enforcers are configured, null is returned.
+         * If neither of NOT NULL or CHAR/VARCHAR length or BINARY/VARBINARY enforcers are
+         * configured, null is returned.
          */
         public ConstraintEnforcer build() {
             if (isConfigured) {
@@ -172,14 +204,21 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
                 return new ConstraintEnforcer(
                         notNullEnforcer,
                         notNullFieldIndices,
-                        charLengthEnforcer,
+                        typeLengthEnforcer,
                         charFieldInfo != null
-                                ? charFieldInfo.stream().mapToInt(cfi -> cfi.fieldIdx).toArray()
+                                ? charFieldInfo.stream().mapToInt(fi -> fi.fieldIdx).toArray()
                                 : null,
                         charFieldInfo != null
-                                ? charFieldInfo.stream().mapToInt(cfi -> cfi.length).toArray()
+                                ? charFieldInfo.stream().mapToInt(fi -> fi.length).toArray()
+                                : null,
+                        charFieldInfo != null ? buildCouldPad(charFieldInfo) : null,
+                        binaryFieldInfo != null
+                                ? binaryFieldInfo.stream().mapToInt(fi -> fi.fieldIdx).toArray()
                                 : null,
-                        charFieldInfo != null ? buildShouldPad(charFieldInfo) : null,
+                        binaryFieldInfo != null
+                                ? binaryFieldInfo.stream().mapToInt(fi -> fi.length).toArray()
+                                : null,
+                        binaryFieldInfo != null ? buildCouldPad(binaryFieldInfo) : null,
                         allFieldNames,
                         operatorName);
             }
@@ -187,21 +226,25 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
         }
     }
 
-    private static BitSet buildShouldPad(List<CharFieldInfo> charFieldInfo) {
-        BitSet shouldPad = new BitSet(charFieldInfo.size());
+    private static BitSet buildCouldPad(List<FieldInfo> charFieldInfo) {
+        BitSet couldPad = new BitSet(charFieldInfo.size());
         for (int i = 0; i < charFieldInfo.size(); i++) {
-            if (charFieldInfo.get(i).shouldPad) {
-                shouldPad.set(i);
+            if (charFieldInfo.get(i).couldPad) {
+                couldPad.set(i);
             }
         }
-        return shouldPad;
+        return couldPad;
     }
 
     @Override
     public void processElement(StreamRecord<RowData> element) throws Exception {
         RowData processedRowData = processNotNullConstraint(element.getValue());
+        // If NOT NULL constraint is not respected don't proceed to process the other constraints,
+        // simply drop the record.
         if (processedRowData != null) {
-            collector.collect(processCharConstraint(processedRowData));
+            processedRowData = processCharConstraint(processedRowData);
+            processedRowData = processBinaryConstraint(processedRowData);
+            collector.collect(processedRowData);
         }
     }
 
@@ -231,8 +274,9 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
     }
 
     private RowData processCharConstraint(RowData rowData) {
-        if (charLengthEnforcer == null
-                || charLengthEnforcer == ExecutionConfigOptions.CharLengthEnforcer.IGNORE) {
+        if (typeLengthEnforcer == null
+                || typeLengthEnforcer == TypeLengthEnforcer.IGNORE
+                || charFieldIndices == null) {
             return rowData;
         }
 
@@ -244,7 +288,7 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
             final BinaryStringData stringData = (BinaryStringData) rowData.getString(fieldIdx);
             final int sourceStrLength = stringData.numChars();
 
-            if (charFieldShouldPad.get(i) && sourceStrLength < length) {
+            if (charFieldCouldPad.get(i) && sourceStrLength < length) {
                 if (updatedRowData == null) {
                     updatedRowData = new UpdatableRowData(rowData, allFieldNames.length);
                 }
@@ -266,20 +310,48 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
         return updatedRowData != null ? updatedRowData : rowData;
     }
 
+    private RowData processBinaryConstraint(RowData rowData) {
+        if (typeLengthEnforcer == null
+                || typeLengthEnforcer == TypeLengthEnforcer.IGNORE
+                || binaryFieldIndices == null) {
+            return rowData;
+        }
+
+        UpdatableRowData updatedRowData = null;
+
+        for (int i = 0; i < binaryFieldLengths.length; i++) {
+            final int fieldIdx = binaryFieldIndices[i];
+            final int length = binaryFieldLengths[i];
+            final byte[] binaryData = rowData.getBinary(fieldIdx);
+            final int sourceLength = binaryData.length;
+
+            // Trimming takes places because of the shorter length used in `Arrays.copyOf` and
+            // padding because of the longer length, as implicitly the trailing bytes are 0.
+            if ((sourceLength > length) || (binaryFieldCouldPad.get(i) && sourceLength < length)) {
+                if (updatedRowData == null) {
+                    updatedRowData = new UpdatableRowData(rowData, allFieldNames.length);
+                }
+                updatedRowData.setField(fieldIdx, Arrays.copyOf(binaryData, length));
+            }
+        }
+
+        return updatedRowData != null ? updatedRowData : rowData;
+    }
+
     /**
-     * Helper POJO to keep info about CHAR/VARCHAR Fields, used to determine if trimming or padding
-     * is needed.
+     * Helper POJO to keep info about CHAR/VARCHAR/BINARY/VARBINARY fields, used to determine if
+     * trimming or padding is needed.
      */
     @Internal
-    public static class CharFieldInfo {
+    public static class FieldInfo {
         private final int fieldIdx;
         private final Integer length;
-        private final boolean shouldPad;
+        private final boolean couldPad;
 
-        public CharFieldInfo(int fieldIdx, @Nullable Integer length, boolean shouldPad) {
+        public FieldInfo(int fieldIdx, @Nullable Integer length, boolean couldPad) {
             this.fieldIdx = fieldIdx;
             this.length = length;
-            this.shouldPad = shouldPad;
+            this.couldPad = couldPad;
         }
 
         public int fieldIdx() {