You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/22 14:03:41 UTC
[incubator-doris] branch master updated: [FlinkConnector] Make
flink datastream source parameterized (#6473)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4ff6eb5 [FlinkConnector] Make flink datastream source parameterized (#6473)
4ff6eb5 is described below
commit 4ff6eb55d04b5923f04e5e9f03e1acdd0a561bd1
Author: wunan1210 <wu...@gmail.com>
AuthorDate: Sun Aug 22 22:03:32 2021 +0800
[FlinkConnector] Make flink datastream source parameterized (#6473)
make flink datastream source parameterized as List<?> instead of Object.
---
.../doris/flink/datastream/DorisSourceFunction.java | 16 ++++++++--------
.../deserialization/SimpleListDeserializationSchema.java | 8 +++++---
.../apache/doris/flink/datastream/ScalaValueReader.scala | 2 +-
3 files changed, 14 insertions(+), 12 deletions(-)
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 82ab224..08ec5b0 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -36,17 +36,17 @@ import java.util.List;
* DorisSource
**/
-public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
+public class DorisSourceFunction extends RichSourceFunction<List<?>> implements ResultTypeQueryable<List<?>> {
private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
- private DorisDeserializationSchema deserializer;
- private DorisOptions options;
- private DorisReadOptions readOptions;
+ private final DorisDeserializationSchema<List<?>> deserializer;
+ private final DorisOptions options;
+ private final DorisReadOptions readOptions;
private List<PartitionDefinition> dorisPartitions;
private ScalaValueReader scalaValueReader;
- public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
+ public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) {
this.deserializer = deserializer;
this.options = streamOptions.getOptions();
this.readOptions = streamOptions.getReadOptions();
@@ -59,11 +59,11 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res
}
@Override
- public void run(SourceContext sourceContext) throws Exception {
+ public void run(SourceContext<List<?>> sourceContext) {
for (PartitionDefinition partitions : dorisPartitions) {
scalaValueReader = new ScalaValueReader(partitions, options, readOptions);
while (scalaValueReader.hasNext()) {
- Object next = scalaValueReader.next();
+ List<?> next = scalaValueReader.next();
sourceContext.collect(next);
}
}
@@ -76,7 +76,7 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res
@Override
- public TypeInformation<T> getProducedType() {
+ public TypeInformation<List<?>> getProducedType() {
return this.deserializer.getProducedType();
}
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
index 7fcf2f6..d9ec6e5 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
@@ -17,15 +17,17 @@
package org.apache.doris.flink.deserialization;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.util.List;
-public class SimpleListDeserializationSchema implements DorisDeserializationSchema {
+public class SimpleListDeserializationSchema implements DorisDeserializationSchema<List<?>> {
@Override
- public TypeInformation getProducedType() {
- return TypeInformation.of(List.class);
+ public TypeInformation<List<?>> getProducedType() {
+ return TypeInformation.of(new TypeHint<List<?>>() {
+ });
}
}
diff --git a/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
index bdf9487..e69a86f 100644
--- a/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
+++ b/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
@@ -206,7 +206,7 @@ class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, re
* get next value.
* @return next value
*/
- def next: AnyRef = {
+ def next: java.util.List[_] = {
if (!hasNext) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
throw new ShouldNeverHappenException
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org