You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/10/19 00:25:22 UTC

[GitHub] [rocketmq-externals] stevenchange opened a new pull request #640: [ISSUE #638] add flinksql connector rocketmq

stevenchange opened a new pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640


   ## What is the purpose of the change
   
   1.add Flink sql connect rocket mq
   2.add Flink sql connect rocket mq default serializa util
   3.update RocketMQSource and RocketMQSink File for adapt new sql connector
   
   ## Brief changelog
   
   1.add Flink sql connect rocket mq
   2.add Flink sql connect rocket mq default serializa util
   3.update RocketMQSource and RocketMQSink File for adapt new sql connector
   
   ## Verifying this change
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] stevenchange commented on pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
stevenchange commented on pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#issuecomment-713257806


   @vongosling Is there any new comment for this pr?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] stevenchange commented on pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
stevenchange commented on pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#issuecomment-716941678


   > Wait a minute. The code is still being reviewed.
   
   OK, got it....  Thanks a lot


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling closed pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
vongosling closed pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#issuecomment-733430796


   This is a second-round review. Would you check it globally as the comments said?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#issuecomment-769667587


   Of course, Don't be hesitate to involve in :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#issuecomment-736441441


   @stevenchange ping


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on a change in pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
vongosling commented on a change in pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#discussion_r530069295



##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
##########
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>

Review comment:
       License is a standard template, Pls do not modify here.

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
##########
@@ -59,7 +60,9 @@
 
     private Properties props;
     private TopicSelector<IN> topicSelector;
-    private KeyValueSerializationSchema<IN> serializationSchema;
+    private KeyValueSerializationSchema<IN> keyValueSerializationSchema;
+    //flink sql connector serialization schema
+    private RmqSerializationSchema<IN> rmqSerializationSchema;

Review comment:
       RmqSerializationSchema is a confusing name. You Could change it to a self-explanation name such as SQLSerializationSchema class or field. 

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
##########
@@ -152,21 +166,36 @@ public void onException(Throwable throwable) {
     }
 
     private Message prepareMessage(IN input) {
-        String topic = topicSelector.getTopic(input);
-        String tag = (tag = topicSelector.getTag(input)) != null ? tag : "";
+        Validate.notNull(input, "the input is null");

Review comment:
       prepareMessage will throw runtime exception if you use validate mode in here. Is it what you expect?

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
##########
@@ -351,7 +366,15 @@ public void initializeState(FunctionInitializationContext context) throws Except
 
     @Override
     public TypeInformation<OUT> getProducedType() {
-        return schema.getProducedType();
+        if(schema != null){

Review comment:
       sql is a high priority here?

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
##########
@@ -68,7 +70,9 @@
     private transient MQPullConsumerScheduleService pullConsumerScheduleService;
     private DefaultMQPullConsumer consumer;
 
-    private KeyValueDeserializationSchema<OUT> schema;
+    private KeyValueDeserializationSchema<OUT> keyValueSchema;
+
+    private DeserializationSchema<OUT> schema;

Review comment:
       It's the same optimization todos as my suggestion.

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
##########
@@ -152,21 +166,36 @@ public void onException(Throwable throwable) {
     }
 
     private Message prepareMessage(IN input) {
-        String topic = topicSelector.getTopic(input);
-        String tag = (tag = topicSelector.getTag(input)) != null ? tag : "";
+        Validate.notNull(input, "the input is null");
+
+        if (keyValueSerializationSchema != null) {

Review comment:
       Is it necessary to judge not null here? since you have validated it in the open method.

##########
File path: rocketmq-flink/src/main/java/org/apache/rocketmq/flink/common/serialization/json/RmqJsonDecodingFormat.java
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.serialization.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+/**
+ * @Author: gaobo07

Review comment:
       Pls, remove the author and date info. You could refer to others to do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#issuecomment-716503977


   Wait a minute. The code is still being reviewed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#issuecomment-738698495


   I will close this pr, we could do it in the newer codebase


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] vongosling commented on a change in pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
vongosling commented on a change in pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#discussion_r507285914



##########
File path: rocketmq-flink/pom.xml
##########
@@ -119,10 +137,50 @@
             <version>${rocketmq.version}</version>
             <scope>test</scope>
         </dependency>
+
+
+        <!-- JSON table descriptor testing -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <!-- Json filesystem format factory ITCase test dependency -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <!-- test utils dependency -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- JSON RowData schema test dependency -->
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>2.11.12</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>

Review comment:
       This is not required dependency

##########
File path: rocketmq-flink/src/test/java/org/apache/rocketmq/flink/common/serialization/json/RmqJsonSerDerTest.java
##########
@@ -0,0 +1,138 @@
+package org.apache.rocketmq.flink.common.serialization.json;
+
+import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.rocketmq.common.message.Message;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @Author: gaobo07
+ * @Date: 2020/10/12 11:59 上午
+ */
+public class RmqJsonSerDerTest {
+
+    private static final RowType SCHEMA = (RowType) ROW(
+            FIELD("id", INT().notNull()),
+            FIELD("name", STRING()),
+            FIELD("description", STRING()),
+            FIELD("weight", DOUBLE())
+    ).getLogicalType();
+
+    private DynamicTableSource.DataStructureConverter converter;
+
+    @Test
+    public void testSerializationDeserialization() throws Exception {
+        List<String> lines = readLines("RmqJson-data.txt");
+        RmqJsonDeserializer deserializationSchema = new RmqJsonDeserializer(
+                SCHEMA,
+                new RowDataTypeInfo(SCHEMA),
+                false,
+                TimestampFormat.ISO_8601);
+
+        SimpleCollector collector = new SimpleCollector();
+        for (String line : lines) {
+            deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
+        }
+
+        List<String> expected = Arrays.asList(
+                "+I(101,scooter,Small 2-wheel scooter,3.14)",
+                "+I(102,car battery,12V car battery,8.1)",
+                "+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
+                "+I(104,hammer,12oz carpenter's hammer,0.75)",
+                "+I(105,hammer,14oz carpenter's hammer,0.875)",
+                "+I(106,hammer,16oz carpenter's hammer,1.0)",
+                "+I(107,rocks,box of assorted rocks,5.3)",
+                "+I(108,jacket,water resistent black wind breaker,0.1)",
+                "+I(109,spare tire,24 inch spare tire,22.2)",
+                "+I(110,jacket,water resistent white wind breaker,0.2)",
+                "+I(111,scooter,Big 2-wheel scooter ,5.18)"
+        );
+        List<String> actual = collector.list.stream()
+                .map(Object::toString)
+                .collect(Collectors.toList());
+        assertEquals(expected, actual);
+
+        RmqJsonSerializer serializationSchema = new RmqJsonSerializer(
+                SCHEMA,
+                TimestampFormat.SQL,
+                0);
+        serializationSchema.open(null);
+        List<String> result = new ArrayList<>();
+        for (RowData rowData : collector.list) {
+            result.add(serializationSchema.serialize(rowData).toString());
+        }
+        List<String> expectedResult = new ArrayList<>();
+        expectedResult.add(new Message("","","101", ("{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\"," +
+                "\"weight\":3.14}").getBytes()).toString());
+        expectedResult.add(new Message("","","102", ("{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car " +

Review comment:
       This dataset is not maintainable friendly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] stevenchange commented on pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
stevenchange commented on pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#issuecomment-768136886


   @vongosling Very Very Sorry, I haven’t follow this for long time.  Could I do it again?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-externals] stevenchange commented on pull request #640: [ISSUE #638] add flinksql connector rocketmq

Posted by GitBox <gi...@apache.org>.
stevenchange commented on pull request #640:
URL: https://github.com/apache/rocketmq-externals/pull/640#issuecomment-711448614


   @vongosling could you review this pr? thanks a lot


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org