You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/08/12 07:56:28 UTC

[GitHub] [rocketmq-externals] SteNicholas opened a new pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

SteNicholas opened a new pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779


   ## What is the purpose of the change
   
   At present, RocketMQ connector supports the implementation of legacy SourceFunction interface, which needs the `ScanTableSource` implementation to expand Table/SQL capabilities. RocketMQ connector should support the RocketMQ TableSource based on the legacy SourceFunction interface.
   
   ## Brief changelog
   
    - Support `RocketMQScanTableSource` based on the implementation `RocketMQSourceFunction` of legacy `SourceFunction` interface.
   
   ## Verifying this change
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] SteNicholas commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900864108


   @ni-ze , thanks for the reporter to the problems of `RocketMQSource`. The problems of `RocketMQSource` could be fixed in another pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] duhenglucky commented on a change in pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
duhenglucky commented on a change in pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#discussion_r690837086



##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {

Review comment:
       Keep the code format consistent with other branches

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {
+                    return data;
+                }
+            case EXCEPTION:
+                throw new RuntimeException();
+        }
+    }
+
+    private String[] handleFieldIncrement(String[] data) {
+        switch (fieldIncrementStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field increment error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                return data;
+            case EXCEPTION:
+                throw new RuntimeException();

Review comment:
       same as upper comment

##########
File path: rocketmq-flink/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.table;
+
+import org.apache.rocketmq.flink.source.common.RocketMQOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link RocketMQDynamicTableSourceFactory}. */
+public class RocketMQDynamicTableSourceFactoryTest {
+
+    private static final ResolvedSchema SCHEMA =
+            new ResolvedSchema(
+                    Collections.singletonList(Column.physical("name", STRING().notNull())),
+                    new ArrayList<>(),
+                    null);
+
+    private static final String IDENTIFIER = "rocketmq";
+    private static final String TOPIC = "test_source";
+    private static final String CONSUMER_GROUP = "test_consumer";
+    private static final String NAME_SERVER_ADDRESS =
+            "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";

Review comment:
       Please do not include any vendor-specific code

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {
+                    return data;
+                }
+            case EXCEPTION:
+                throw new RuntimeException();

Review comment:
       There seems to be a need for a logger to print what happened, and this exception does not contain any information




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] SteNicholas edited a comment on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
SteNicholas edited a comment on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900766859


   > @duhenglucky @RongtongJin @SteNicholas This connector may donate to flink like this, [apache/flink#15304](https://github.com/apache/flink/pull/15304). I would like to help if we could step it.
   
   @vongosling @duhenglucky , IMO, RocketMQ connector couldn't contribute to Flink connectors in a short time. It is recommended to graduate this connector first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] ni-ze edited a comment on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
ni-ze edited a comment on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900797723


   @SteNicholas  Thread Pool do not close when job finish
   https://github.com/apache/rocketmq-externals/issues/786#issue-973254402
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] SteNicholas commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900866750


   > @SteNicholas
   > MessageQueues never update duration Flink job running, How to support message queues rebalance?
   > 
   > [#787 (comment)](https://github.com/apache/rocketmq-externals/issues/787#issue-973258768)
   
   @ni-ze , in legacy `RocketMQSource`, the `MessageQueue` doesn't support to rebalance during the Flink job running.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900869821


   @SteNicholas I see the status of our flink integration, there is still improvement to polish before we call for gruduation. #787 (comment), do we have a plan to support this, like the partition discovery feature in here. https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] duhenglucky commented on a change in pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
duhenglucky commented on a change in pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#discussion_r690837086



##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {

Review comment:
       Keep the code format consistent with other branches

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {
+                    return data;
+                }
+            case EXCEPTION:
+                throw new RuntimeException();
+        }
+    }
+
+    private String[] handleFieldIncrement(String[] data) {
+        switch (fieldIncrementStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field increment error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                return data;
+            case EXCEPTION:
+                throw new RuntimeException();

Review comment:
       same as upper comment

##########
File path: rocketmq-flink/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.table;
+
+import org.apache.rocketmq.flink.source.common.RocketMQOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link RocketMQDynamicTableSourceFactory}. */
+public class RocketMQDynamicTableSourceFactoryTest {
+
+    private static final ResolvedSchema SCHEMA =
+            new ResolvedSchema(
+                    Collections.singletonList(Column.physical("name", STRING().notNull())),
+                    new ArrayList<>(),
+                    null);
+
+    private static final String IDENTIFIER = "rocketmq";
+    private static final String TOPIC = "test_source";
+    private static final String CONSUMER_GROUP = "test_consumer";
+    private static final String NAME_SERVER_ADDRESS =
+            "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";

Review comment:
       Please do not include any vendor-specific code

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + columnSize
+                                    + ", data column number: "
+                                    + columnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {
+                    return data;
+                }
+            case EXCEPTION:
+                throw new RuntimeException();

Review comment:
       There seems to be a need for a logger to print what happened, and this exception does not contain any information




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] duhenglucky merged pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
duhenglucky merged pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on a change in pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
vongosling commented on a change in pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#discussion_r690949600



##########
File path: rocketmq-flink/pom.xml
##########
@@ -45,11 +45,13 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
             <version>${flink.version}</version>
+            <scope>provided</scope>

Review comment:
       Good polish




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-899954733


   @duhenglucky @RongtongJin @SteNicholas This connector may donate to flink like this, https://github.com/apache/flink/pull/15304. I would like to help if we could step it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] ni-ze commented on a change in pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
ni-ze commented on a change in pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#discussion_r690837199



##########
File path: rocketmq-flink/pom.xml
##########
@@ -34,7 +34,7 @@
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <rocketmq.version>4.7.1</rocketmq.version>
-        <flink.version>1.13.0</flink.version>
+        <flink.version>1.13.1</flink.version>

Review comment:
       Why rocketmq-flink package has flink jar?That will lead a package conflict when run on Flink.
   
   ![image](https://user-images.githubusercontent.com/31175234/129822920-be04ac7f-53c8-4c28-b749-7088237fced2.png)
   
   It should set scope to "provided".
   
   ```xml
   		<dependency>
   			<groupId>org.apache.flink</groupId>
   			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   			<version>${project.version}</version>
   			<scope>provided</scope>
   		</dependency>
   		<dependency>
   			<groupId>org.apache.flink</groupId>
   			<artifactId>flink-java</artifactId>
   			<version>${project.version}</version>
   			<scope>provided</scope>
   		</dependency>
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] ni-ze edited a comment on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
ni-ze edited a comment on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900797723


   @SteNicholas  Thread Pool do not close when job finish
   https://github.com/apache/rocketmq-externals/issues/786#issue-973254402
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] duhenglucky commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
duhenglucky commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900745867


   > @duhenglucky @RongtongJin @SteNicholas This connector may donate to flink like this, [apache/flink#15304](https://github.com/apache/flink/pull/15304). I would like to help if we could step it.
   
   this is also one of @SteNicholas's targets in the near future.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] ni-ze commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
ni-ze commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900797723


   @SteNicholas 
   https://github.com/apache/rocketmq-externals/issues/786#issue-973254402
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] duhenglucky merged pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
duhenglucky merged pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on a change in pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
vongosling commented on a change in pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#discussion_r690949600



##########
File path: rocketmq-flink/pom.xml
##########
@@ -45,11 +45,13 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
             <version>${flink.version}</version>
+            <scope>provided</scope>

Review comment:
       Good polish




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on a change in pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
vongosling commented on a change in pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#discussion_r689993084



##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
##########
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0

Review comment:
       Do not format the license style, you could make a change in ide,https://www.jetbrains.com/help/idea/copyright.html#update-copyright




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] ni-ze commented on a change in pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
ni-ze commented on a change in pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#discussion_r690837199



##########
File path: rocketmq-flink/pom.xml
##########
@@ -34,7 +34,7 @@
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <rocketmq.version>4.7.1</rocketmq.version>
-        <flink.version>1.13.0</flink.version>
+        <flink.version>1.13.1</flink.version>

Review comment:
       Why rocketmq-flink package has flink jar?That will lead a package conflict when run on Flink.
   
   ![image](https://user-images.githubusercontent.com/31175234/129822920-be04ac7f-53c8-4c28-b749-7088237fced2.png)
   
   It should set scope to "provided".
   
   ```xml
   		<dependency>
   			<groupId>org.apache.flink</groupId>
   			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   			<version>${project.version}</version>
   			<scope>provided</scope>
   		</dependency>
   		<dependency>
   			<groupId>org.apache.flink</groupId>
   			<artifactId>flink-java</artifactId>
   			<version>${project.version}</version>
   			<scope>provided</scope>
   		</dependency>
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] SteNicholas commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900766859


   > @duhenglucky @RongtongJin @SteNicholas This connector may donate to flink like this, [apache/flink#15304](https://github.com/apache/flink/pull/15304). I would like to help if we could step it.
   
   @vongosling IMO, RocketMQ connector couldn't contribute to Flink connectors in a short time. It is recommended to graduate this connector first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900869821


   @SteNicholas I see the status of our flink integration, there is still improvement to polish before we call for gruduation. #787 (comment), do we have a plan to support this, like the partition discovery feature in here. https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] ni-ze commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
ni-ze commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900797723






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] SteNicholas edited a comment on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
SteNicholas edited a comment on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900766859


   > @duhenglucky @RongtongJin @SteNicholas This connector may donate to flink like this, [apache/flink#15304](https://github.com/apache/flink/pull/15304). I would like to help if we could step it.
   
   @vongosling @duhenglucky , IMO, RocketMQ connector couldn't contribute to Flink connectors in a short time. It is recommended to graduate this connector first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] SteNicholas commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900766859






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] ni-ze commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
ni-ze commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900801014


   @SteNicholas 
   MessageQueues never update duration Flink job running, How to support message queues rebalance?
   
   https://github.com/apache/rocketmq-externals/issues/787#issue-973258768


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] duhenglucky commented on pull request #779: [#715] Support the RocketMQ TableSource based on the legacy Source implementation

Posted by GitBox <gi...@apache.org>.
duhenglucky commented on pull request #779:
URL: https://github.com/apache/rocketmq-externals/pull/779#issuecomment-900745867


   > @duhenglucky @RongtongJin @SteNicholas This connector may donate to flink like this, [apache/flink#15304](https://github.com/apache/flink/pull/15304). I would like to help if we could step it.
   
   this is also one of @SteNicholas's targets in the near future.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org