You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/08/18 01:43:05 UTC

[GitHub] [rocketmq-externals] duhenglucky commented on a change in pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

duhenglucky commented on a change in pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#discussion_r690837086



##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {

Review comment:
       Keep the code format consistent with other branches

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {
+                    return data;
+                }
+            case EXCEPTION:
+                throw new RuntimeException();
+        }
+    }
+
+    private String[] handleFieldIncrement(String[] data) {
+        switch (fieldIncrementStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field increment error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                return data;
+            case EXCEPTION:
+                throw new RuntimeException();

Review comment:
       same as upper comment

##########
File path: rocketmq-flink/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.table;
+
+import org.apache.rocketmq.flink.source.common.RocketMQOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link RocketMQDynamicTableSourceFactory}. */
+public class RocketMQDynamicTableSourceFactoryTest {
+
+    private static final ResolvedSchema SCHEMA =
+            new ResolvedSchema(
+                    Collections.singletonList(Column.physical("name", STRING().notNull())),
+                    new ArrayList<>(),
+                    null);
+
+    private static final String IDENTIFIER = "rocketmq";
+    private static final String TOPIC = "test_source";
+    private static final String CONSUMER_GROUP = "test_consumer";
+    private static final String NAME_SERVER_ADDRESS =
+            "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";

Review comment:
       Please do not include any vendor-specific code

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {
+                    return data;
+                }
+            case EXCEPTION:
+                throw new RuntimeException();

Review comment:
       There seems to be a need for a logger to print what happened, and this exception does not contain any information




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org