You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/22 14:03:41 UTC

[incubator-doris] branch master updated: [FlinkConnector] Make flink datastream source parameterized (#6473)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ff6eb5  [FlinkConnector] Make flink datastream source parameterized (#6473)
4ff6eb5 is described below

commit 4ff6eb55d04b5923f04e5e9f03e1acdd0a561bd1
Author: wunan1210 <wu...@gmail.com>
AuthorDate: Sun Aug 22 22:03:32 2021 +0800

    [FlinkConnector] Make flink datastream source parameterized (#6473)
    
    make flink datastream source parameterized as List<?> instead of Object.
---
 .../doris/flink/datastream/DorisSourceFunction.java      | 16 ++++++++--------
 .../deserialization/SimpleListDeserializationSchema.java |  8 +++++---
 .../apache/doris/flink/datastream/ScalaValueReader.scala |  2 +-
 3 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 82ab224..08ec5b0 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -36,17 +36,17 @@ import java.util.List;
  * DorisSource
  **/
 
-public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
+public class DorisSourceFunction extends RichSourceFunction<List<?>> implements ResultTypeQueryable<List<?>> {
 
     private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
 
-    private DorisDeserializationSchema deserializer;
-    private DorisOptions options;
-    private DorisReadOptions readOptions;
+    private final DorisDeserializationSchema<List<?>> deserializer;
+    private final DorisOptions options;
+    private final DorisReadOptions readOptions;
     private List<PartitionDefinition> dorisPartitions;
     private ScalaValueReader scalaValueReader;
 
-    public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
+    public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) {
         this.deserializer = deserializer;
         this.options = streamOptions.getOptions();
         this.readOptions = streamOptions.getReadOptions();
@@ -59,11 +59,11 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res
     }
 
     @Override
-    public void run(SourceContext sourceContext) throws Exception {
+    public void run(SourceContext<List<?>> sourceContext) {
         for (PartitionDefinition partitions : dorisPartitions) {
             scalaValueReader = new ScalaValueReader(partitions, options, readOptions);
             while (scalaValueReader.hasNext()) {
-                Object next = scalaValueReader.next();
+                List<?> next = scalaValueReader.next();
                 sourceContext.collect(next);
             }
         }
@@ -76,7 +76,7 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res
 
 
     @Override
-    public TypeInformation<T> getProducedType() {
+    public TypeInformation<List<?>> getProducedType() {
         return this.deserializer.getProducedType();
     }
 }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
index 7fcf2f6..d9ec6e5 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
@@ -17,15 +17,17 @@
 package org.apache.doris.flink.deserialization;
 
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import java.util.List;
 
 
-public class SimpleListDeserializationSchema implements DorisDeserializationSchema {
+public class SimpleListDeserializationSchema implements DorisDeserializationSchema<List<?>> {
 
     @Override
-    public TypeInformation getProducedType() {
-        return TypeInformation.of(List.class);
+    public TypeInformation<List<?>> getProducedType() {
+        return TypeInformation.of(new TypeHint<List<?>>() {
+        });
     }
 }
diff --git a/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
index bdf9487..e69a86f 100644
--- a/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
+++ b/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
@@ -206,7 +206,7 @@ class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, re
    * get next value.
    * @return next value
    */
-  def next: AnyRef = {
+  def next: java.util.List[_] = {
     if (!hasNext) {
       logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
       throw new ShouldNeverHappenException

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org