You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/07/21 11:54:41 UTC
[doris-flink-connector] branch master updated: [Feature] Support Flink 1.15 (#49)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7171929 [Feature] Support Flink 1.15 (#49)
7171929 is described below
commit 7171929fb4ab64d96e75ad3f58752e64a1a75f11
Author: wudi <67...@qq.com>
AuthorDate: Thu Jul 21 19:54:36 2022 +0800
[Feature] Support Flink 1.15 (#49)
* flink 1.15 support
---
.github/workflows/build-extension.yml | 4 +-
flink-doris-connector/pom.xml | 134 ++----------
.../apache/doris/flink/backend/BackendClient.java | 9 +-
.../doris/flink/cfg/DorisExecutionOptions.java | 2 +-
.../flink/datastream/DorisSourceFunction.java | 14 +-
.../flink/exception/ConnectedFailedException.java | 2 +-
.../flink/exception/DorisInternalException.java | 2 +-
.../exception/ShouldNeverHappenException.java | 2 +-
.../org/apache/doris/flink/rest/RestService.java | 12 +-
.../apache/doris/flink/serialization/RowBatch.java | 7 +-
.../source/reader/DorisSourceSplitReader.java | 25 ++-
.../flink/source/reader/DorisValueReader.java | 229 +++++++++++++++++++++
.../flink/source/split/DorisSplitRecords.java | 18 +-
.../flink/table/DorisDynamicTableFactory.java | 2 +-
.../doris/flink/table/DorisRowDataInputFormat.java | 33 +--
.../doris/flink/datastream/ScalaValueReader.scala | 222 --------------------
.../apache/doris/flink/DorisSourceSinkExample.java | 1 -
.../doris/flink/serialization/TestRowBatch.java | 17 +-
18 files changed, 310 insertions(+), 425 deletions(-)
diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-extension.yml
index b2ad6af..bc0c47c 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-extension.yml
@@ -46,7 +46,7 @@ jobs:
touch custom_env.sh
echo 'export THRIFT_BIN=/usr/bin/thrift' >> custom_env.sh
- - name: Build flink connector 1.14
+ - name: Build flink connector 1.15
run: |
- cd flink-doris-connector/ && /bin/bash build.sh --flink 1.14.3 --scala 2.12
+ cd flink-doris-connector/ && /bin/bash build.sh --flink 1.15.0
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index ddcd3de..27b6df1 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -26,7 +26,7 @@ under the License.
<version>23</version>
</parent>
<groupId>org.apache.doris</groupId>
- <artifactId>flink-doris-connector-${flink.minor.version}_${scala.version}</artifactId>
+ <artifactId>flink-doris-connector-${flink.minor.version}</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>Flink Doris Connector</name>
<url>https://doris.apache.org/</url>
@@ -62,7 +62,6 @@ under the License.
</mailingList>
</mailingLists>
<properties>
- <scala.version>${env.scala.version}</scala.version>
<flink.version>${env.flink.version}</flink.version>
<flink.minor.version>${env.flink.minor.version}</flink.minor.version>
<libthrift.version>0.13.0</libthrift.version>
@@ -111,16 +110,7 @@ under the License.
<profile>
<id>flink.version</id>
<properties>
- <env.flink.version>1.14.3</env.flink.version>
- </properties>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- </profile>
- <profile>
- <id>scala.version</id>
- <properties>
- <env.scala.version>2.12</env.scala.version>
+ <env.flink.version>1.15.0</env.flink.version>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
@@ -147,26 +137,31 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
+ <artifactId>flink-clients</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.version}</artifactId>
+ <artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.version}</artifactId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
- <!-- flink table -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.version}</artifactId>
+ <artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
@@ -235,6 +230,7 @@ under the License.
<artifactId>netty-common</artifactId>
<version>4.1.77.Final</version>
</dependency>
+ <!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
@@ -250,35 +246,6 @@ under the License.
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-web</artifactId>
- <version>${log4j2.version}</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- <version>${log4j2.version}</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>${log4j2.version}</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <version>${log4j2.version}</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.9</version>
- </dependency>
<!--Test-->
<dependency>
<groupId>org.hamcrest</groupId>
@@ -288,26 +255,14 @@ under the License.
</dependency>
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-scala_${scala.version}</artifactId>
- <version>1.4.7</version>
- <exclusions>
- <exclusion>
- <artifactId>hamcrest-core</artifactId>
- <groupId>org.hamcrest</groupId>
- </exclusion>
- </exclusions>
+ <artifactId>mockito-core</artifactId>
+ <version>2.27.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
- <exclusions>
- <exclusion>
- <artifactId>hamcrest-core</artifactId>
- <groupId>org.hamcrest</groupId>
- </exclusion>
- </exclusions>
<scope>test</scope>
</dependency>
</dependencies>
@@ -344,32 +299,6 @@ under the License.
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.1</version>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <args>
- <arg>-feature</arg>
- </args>
- </configuration>
- </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
@@ -417,39 +346,6 @@ under the License.
</execution>
</executions>
</plugin>
- <!--
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- <version>0.7.8</version>
- <configuration>
- <excludes>
- <exclude>**/thrift/**</exclude>
- </excludes>
- </configuration>
- <executions>
- <execution>
- <id>prepare-agent</id>
- <goals>
- <goal>prepare-agent</goal>
- </goals>
- </execution>
- <execution>
- <id>check</id>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- <execution>
- <id>report</id>
- <phase>test</phase>
- <goals>
- <goal>report</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
index 9b8d955..f55d148 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.backend;
import org.apache.doris.flink.cfg.ConfigurationOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.ConnectedFailedException;
-import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisInternalException;
import org.apache.doris.flink.serialization.Routing;
import org.apache.doris.flink.util.ErrorMessages;
@@ -57,7 +56,7 @@ public class BackendClient {
private final int socketTimeout;
private final int connectTimeout;
- public BackendClient(Routing routing, DorisReadOptions readOptions) throws ConnectedFailedException {
+ public BackendClient(Routing routing, DorisReadOptions readOptions) {
this.routing = routing;
this.connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs();
this.socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs();
@@ -67,7 +66,7 @@ public class BackendClient {
open();
}
- private void open() throws ConnectedFailedException {
+ private void open() {
logger.debug("Open client to Doris BE '{}'.", routing);
TException ex = null;
for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
@@ -117,7 +116,7 @@ public class BackendClient {
* @return scan open result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
*/
- public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedFailedException {
+ public TScanOpenResult openScanner(TScanOpenParams openParams) {
logger.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams);
if (!isConnected) {
open();
@@ -153,7 +152,7 @@ public class BackendClient {
* @return scan batch result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
*/
- public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws DorisException {
+ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) {
logger.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams);
if (!isConnected) {
open();
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 2daf5e1..102a7ee 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -73,7 +73,7 @@ public class DorisExecutionOptions implements Serializable {
public static DorisExecutionOptions defaults() {
Properties properties = new Properties();
properties.setProperty("format", "json");
- properties.setProperty("strip_outer_array", "true");
+ properties.setProperty("read_json_by_line", "true");
return new Builder().setStreamLoadProp(properties).build();
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 1957139..fc5fd14 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -23,18 +23,18 @@ import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.PartitionDefinition;
import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.source.reader.DorisValueReader;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
-
/**
* DorisSource
**/
@@ -48,7 +48,7 @@ public class DorisSourceFunction extends RichParallelSourceFunction<List<?>> imp
private final DorisReadOptions readOptions;
private transient volatile boolean isRunning;
private List<PartitionDefinition> dorisPartitions;
- private List<PartitionDefinition> taskDorisPartitions = Lists.newArrayList();
+ private List<PartitionDefinition> taskDorisPartitions = new ArrayList<>();
public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) {
this.deserializer = deserializer;
@@ -87,11 +87,13 @@ public class DorisSourceFunction extends RichParallelSourceFunction<List<?>> imp
@Override
public void run(SourceContext<List<?>> sourceContext) {
for (PartitionDefinition partitions : taskDorisPartitions) {
- try (ScalaValueReader scalaValueReader = new ScalaValueReader(partitions, options, readOptions)) {
- while (isRunning && scalaValueReader.hasNext()) {
- List<?> next = scalaValueReader.next();
+ try (DorisValueReader valueReader = new DorisValueReader(partitions, options, readOptions)) {
+ while (isRunning && valueReader.hasNext()) {
+ List<?> next = valueReader.next();
sourceContext.collect(next);
}
+ } catch (Exception e) {
+ logger.error("close reader resource failed,", e);
}
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
index e25d1a5..6f755b7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
@@ -17,7 +17,7 @@
package org.apache.doris.flink.exception;
-public class ConnectedFailedException extends DorisException {
+public class ConnectedFailedException extends DorisRuntimeException {
public ConnectedFailedException(String server, Throwable cause) {
super("Connect to " + server + "failed.", cause);
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java
index eadd860..e6756a4 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java
@@ -21,7 +21,7 @@ import org.apache.doris.thrift.TStatusCode;
import java.util.List;
-public class DorisInternalException extends DorisException {
+public class DorisInternalException extends DorisRuntimeException {
public DorisInternalException(String server, TStatusCode statusCode, List<String> errorMsgs) {
super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs);
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
index a26718d..81af673 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
@@ -17,5 +17,5 @@
package org.apache.doris.flink.exception;
-public class ShouldNeverHappenException extends DorisException {
+public class ShouldNeverHappenException extends DorisRuntimeException {
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 734bfdb..ff03e01 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -20,16 +20,15 @@ package org.apache.doris.flink.rest;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.apache.commons.io.IOUtils;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisRuntimeException;
-import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.ConfigurationOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.ConnectedFailedException;
import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.exception.ShouldNeverHappenException;
import org.apache.doris.flink.rest.models.Backend;
import org.apache.doris.flink.rest.models.BackendRow;
@@ -37,14 +36,13 @@ import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.QueryPlan;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.rest.models.Tablet;
-import org.apache.flink.calcite.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
-
import org.slf4j.Logger;
import java.io.BufferedReader;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 4dd6732..e83300a 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -33,6 +33,7 @@ import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.flink.util.Preconditions;
@@ -99,7 +100,7 @@ public class RowBatch {
this.offsetInRowBatch = 0;
}
- public RowBatch readArrow() throws DorisException {
+ public RowBatch readArrow() {
try {
this.root = arrowStreamReader.getVectorSchemaRoot();
while (arrowStreamReader.loadNextBatch()) {
@@ -124,7 +125,7 @@ public class RowBatch {
return this;
} catch (Exception e) {
logger.error("Read Doris Data failed because: ", e);
- throw new DorisException(e.getMessage());
+ throw new DorisRuntimeException(e.getMessage());
} finally {
close();
}
@@ -304,7 +305,7 @@ public class RowBatch {
}
}
- public List<Object> next() throws DorisException {
+ public List<Object> next() {
if (!hasNext()) {
String errMsg = "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount;
logger.error(errMsg);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
index aadf6fd..c5d33f7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
@@ -18,7 +18,6 @@ package org.apache.doris.flink.source.reader;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.datastream.ScalaValueReader;
import org.apache.doris.flink.source.split.DorisSourceSplit;
import org.apache.doris.flink.source.split.DorisSplitRecords;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -43,7 +42,7 @@ public class DorisSourceSplitReader
private final Queue<DorisSourceSplit> splits;
private final DorisOptions options;
private final DorisReadOptions readOptions;
- private ScalaValueReader scalaValueReader;
+ private DorisValueReader valueReader;
private String currentSplitId;
public DorisSourceSplitReader(DorisOptions options, DorisReadOptions readOptions) {
@@ -56,14 +55,14 @@ public class DorisSourceSplitReader
public RecordsWithSplitIds<List> fetch() throws IOException {
checkSplitOrStartNext();
- if (!scalaValueReader.hasNext()) {
+ if (!valueReader.hasNext()) {
return finishSplit();
}
- return DorisSplitRecords.forRecords(currentSplitId, scalaValueReader);
+ return DorisSplitRecords.forRecords(currentSplitId, valueReader);
}
private void checkSplitOrStartNext() throws IOException {
- if (scalaValueReader != null) {
+ if (valueReader != null) {
return;
}
final DorisSourceSplit nextSplit = splits.poll();
@@ -71,13 +70,17 @@ public class DorisSourceSplitReader
throw new IOException("Cannot fetch from another split - no split remaining");
}
currentSplitId = nextSplit.splitId();
- scalaValueReader = new ScalaValueReader(nextSplit.getPartitionDefinition(), options, readOptions);
+ valueReader = new DorisValueReader(nextSplit.getPartitionDefinition(), options, readOptions);
}
private DorisSplitRecords finishSplit() {
- if (scalaValueReader != null) {
- scalaValueReader.close();
- scalaValueReader = null;
+ if (valueReader != null) {
+ try {
+ valueReader.close();
+ } catch (Exception e) {
+ LOG.error("close resource reader failed,", e);
+ }
+ valueReader = null;
}
final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId);
currentSplitId = null;
@@ -96,8 +99,8 @@ public class DorisSourceSplitReader
@Override
public void close() throws Exception {
- if (scalaValueReader != null) {
- scalaValueReader.close();
+ if (valueReader != null) {
+ valueReader.close();
}
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
new file mode 100644
index 0000000..173ea90
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.backend.BackendClient;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.exception.ShouldNeverHappenException;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.SchemaUtils;
+import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.serialization.Routing;
+import org.apache.doris.flink.serialization.RowBatch;
+import org.apache.doris.thrift.TScanBatchResult;
+import org.apache.doris.thrift.TScanCloseParams;
+import org.apache.doris.thrift.TScanNextBatchParams;
+import org.apache.doris.thrift.TScanOpenParams;
+import org.apache.doris.thrift.TScanOpenResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
+import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
+
+public class DorisValueReader implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisValueReader.class);
+ protected BackendClient client;
+ private PartitionDefinition partition;
+ private DorisOptions options;
+ private DorisReadOptions readOptions;
+
+ protected int offset = 0;
+ protected AtomicBoolean eos = new AtomicBoolean(false);
+ protected RowBatch rowBatch;
+
+ // flag indicate if support deserialize Arrow to RowBatch asynchronously
+ protected Boolean deserializeArrowToRowBatchAsync;
+
+ protected BlockingQueue<RowBatch> rowBatchBlockingQueue;
+ private TScanOpenParams openParams;
+ protected String contextId;
+ protected Schema schema;
+ protected boolean asyncThreadStarted;
+
+ public DorisValueReader(PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions) {
+ this.partition = partition;
+ this.options = options;
+ this.readOptions = readOptions;
+ this.client = backendClient();
+ this.deserializeArrowToRowBatchAsync = readOptions.getDeserializeArrowAsync() == null ? DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT : readOptions.getDeserializeArrowAsync();
+
+ Integer blockingQueueSize = readOptions.getDeserializeQueueSize() == null ? DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT : readOptions.getDeserializeQueueSize();
+ if (this.deserializeArrowToRowBatchAsync) {
+ this.rowBatchBlockingQueue = new ArrayBlockingQueue(blockingQueueSize);
+ }
+ init();
+ }
+
+ private void init() {
+ this.openParams = openParams();
+ TScanOpenResult openResult = this.client.openScanner(this.openParams);
+ this.contextId = openResult.getContextId();
+ this.schema = SchemaUtils.convertToSchema(openResult.getSelectedColumns());
+ this.asyncThreadStarted = asyncThreadStarted();
+ LOG.debug("Open scan result is, contextId: {}, schema: {}.", contextId, schema);
+ }
+
+ private BackendClient backendClient() {
+ try {
+ return new BackendClient(new Routing(partition.getBeAddress()), readOptions);
+ } catch (IllegalArgumentException e) {
+ LOG.error("init backend:{} client failed,", partition.getBeAddress(), e);
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ private TScanOpenParams openParams() {
+ TScanOpenParams params = new TScanOpenParams();
+ params.cluster = DORIS_DEFAULT_CLUSTER;
+ params.database = partition.getDatabase();
+ params.table = partition.getTable();
+
+ params.tablet_ids = Arrays.asList(partition.getTabletIds().toArray(new Long[]{}));
+ params.opaqued_query_plan = partition.getQueryPlan();
+ // max row number of one read batch
+ Integer batchSize = readOptions.getRequestBatchSize() == null ? DORIS_BATCH_SIZE_DEFAULT : readOptions.getRequestBatchSize();
+ Integer queryDorisTimeout = readOptions.getRequestQueryTimeoutS() == null ? DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT : readOptions.getRequestQueryTimeoutS();
+ Long execMemLimit = readOptions.getExecMemLimit() == null ? DORIS_EXEC_MEM_LIMIT_DEFAULT : readOptions.getExecMemLimit();
+ params.setBatchSize(batchSize);
+ params.setQueryTimeout(queryDorisTimeout);
+ params.setMemLimit(execMemLimit);
+ params.setUser(options.getUsername());
+ params.setPasswd(options.getPassword());
+ LOG.debug("Open scan params is,cluster:{},database:{},table:{},tabletId:{},batch size:{},query timeout:{},execution memory limit:{},user:{},query plan: {}",
+ params.getCluster(), params.getDatabase(), params.getTable(), params.getTabletIds(), params.getBatchSize(), params.getQueryTimeout(), params.getMemLimit(), params.getUser(), params.getOpaquedQueryPlan());
+ return params;
+ }
+
+ protected Thread asyncThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
+ nextBatchParams.setContextId(contextId);
+ while (!eos.get()) {
+ nextBatchParams.setOffset(offset);
+ TScanBatchResult nextResult = client.getNext(nextBatchParams);
+ eos.set(nextResult.isEos());
+ if (!eos.get()) {
+ RowBatch rowBatch = new RowBatch(nextResult, schema).readArrow();
+ offset += rowBatch.getReadRowCount();
+ rowBatch.close();
+ try {
+ rowBatchBlockingQueue.put(rowBatch);
+ } catch (InterruptedException e) {
+ throw new DorisRuntimeException(e);
+ }
+ }
+ }
+ }
+ });
+
+ protected boolean asyncThreadStarted() {
+ boolean started = false;
+ if (deserializeArrowToRowBatchAsync) {
+ asyncThread.start();
+ started = true;
+ }
+ return started;
+ }
+
+ /**
+ * read data and cached in rowBatch.
+ *
+ * @return true if hax next value
+ */
+ public boolean hasNext() {
+ boolean hasNext = false;
+ if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
+ // support deserialize Arrow to RowBatch asynchronously
+ if (rowBatch == null || !rowBatch.hasNext()) {
+ while (!eos.get() || !rowBatchBlockingQueue.isEmpty()) {
+ if (!rowBatchBlockingQueue.isEmpty()) {
+ try {
+ rowBatch = rowBatchBlockingQueue.take();
+ } catch (InterruptedException e) {
+ throw new DorisRuntimeException(e);
+ }
+ hasNext = true;
+ break;
+ } else {
+ // wait for rowBatch put in queue or eos change
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ } else {
+ hasNext = true;
+ }
+ } else {
+ // Arrow data was acquired synchronously during the iterative process
+ if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
+ if (rowBatch != null) {
+ offset += rowBatch.getReadRowCount();
+ rowBatch.close();
+ }
+ TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
+ nextBatchParams.setContextId(contextId);
+ nextBatchParams.setOffset(offset);
+ TScanBatchResult nextResult = client.getNext(nextBatchParams);
+ eos.set(nextResult.isEos());
+ if (!eos.get()) {
+ rowBatch = new RowBatch(nextResult, schema).readArrow();
+ }
+ }
+ hasNext = !eos.get();
+ }
+ return hasNext;
+ }
+
+ /**
+ * get next value.
+ *
+ * @return next value
+ */
+ public List next() {
+ if (!hasNext()) {
+ LOG.error(SHOULD_NOT_HAPPEN_MESSAGE);
+ throw new ShouldNeverHappenException();
+ }
+ return rowBatch.next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ TScanCloseParams closeParams = new TScanCloseParams();
+ closeParams.setContextId(contextId);
+ client.closeScanner(closeParams);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
index 6f02446..9d1bbd7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
@@ -16,7 +16,7 @@
// under the License.
package org.apache.doris.flink.source.split;
-import org.apache.doris.flink.datastream.ScalaValueReader;
+import org.apache.doris.flink.source.reader.DorisValueReader;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import javax.annotation.Nullable;
@@ -26,25 +26,25 @@ import java.util.Set;
/**
* An implementation of {@link RecordsWithSplitIds}.
- * This is essentially a slim wrapper around the {@link ScalaValueReader} that only adds
+ * This is essentially a slim wrapper around the {@link DorisValueReader} that only adds
* information about the current split, or finished splits
*/
public class DorisSplitRecords implements RecordsWithSplitIds<List> {
private final Set<String> finishedSplits;
- private final ScalaValueReader scalaValueReader;
+ private final DorisValueReader valueReader;
private String splitId;
public DorisSplitRecords(String splitId,
- ScalaValueReader scalaValueReader,
+ DorisValueReader valueReader,
Set<String> finishedSplits) {
this.splitId = splitId;
- this.scalaValueReader = scalaValueReader;
+ this.valueReader = valueReader;
this.finishedSplits = finishedSplits;
}
public static DorisSplitRecords forRecords(
- final String splitId, final ScalaValueReader valueReader) {
+ final String splitId, final DorisValueReader valueReader) {
return new DorisSplitRecords(splitId, valueReader, Collections.emptySet());
}
@@ -58,7 +58,7 @@ public class DorisSplitRecords implements RecordsWithSplitIds<List> {
// move the split one (from current value to null)
final String nextSplit = this.splitId;
this.splitId = null;
- if (scalaValueReader == null || !scalaValueReader.hasNext()) {
+ if (valueReader == null || !valueReader.hasNext()) {
return null;
}
return nextSplit;
@@ -67,8 +67,8 @@ public class DorisSplitRecords implements RecordsWithSplitIds<List> {
@Nullable
@Override
public List nextRecordFromSplit() {
- if (scalaValueReader != null && scalaValueReader.hasNext()) {
- List next = scalaValueReader.next();
+ if (valueReader != null && valueReader.hasNext()) {
+ List next = valueReader.next();
return next;
}
return null;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index be00cff..fb44359 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -147,7 +147,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
.key("sink.label-prefix")
.stringType()
- .noDefaultValue()
+ .defaultValue("")
.withDescription("the unique label prefix.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
.key("sink.batch.interval")
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index 16cf2ee..7181ce6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -18,9 +18,9 @@ package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.datastream.ScalaValueReader;
import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.source.reader.DorisValueReader;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
@@ -30,17 +30,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.math.BigDecimal;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
@@ -59,7 +54,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
private List<PartitionDefinition> dorisPartitions;
private TypeInformation<RowData> rowDataTypeInfo;
- private ScalaValueReader scalaValueReader;
+ private DorisValueReader valueReader;
private transient boolean hasNext;
private final DorisRowConverter rowConverter;
@@ -105,8 +100,8 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
*/
@Override
public void open(DorisTableInputSplit inputSplit) throws IOException {
- scalaValueReader = new ScalaValueReader(inputSplit.partition, options, readOptions);
- hasNext = scalaValueReader.hasNext();
+ valueReader = new DorisValueReader(inputSplit.partition, options, readOptions);
+ hasNext = valueReader.hasNext();
}
/**
@@ -147,27 +142,13 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
if (!hasNext) {
return null;
}
- List next = (List) scalaValueReader.next();
+ List next = valueReader.next();
RowData genericRowData = rowConverter.convertInternal(next);
//update hasNext after we've read the record
- hasNext = scalaValueReader.hasNext();
+ hasNext = valueReader.hasNext();
return genericRowData;
}
- private Object deserialize(LogicalType type, Object val) {
- switch (type.getTypeRoot()) {
- case DECIMAL:
- final DecimalType decimalType = ((DecimalType) type);
- final int precision = decimalType.getPrecision();
- final int scala = decimalType.getScale();
- return DecimalData.fromBigDecimal((BigDecimal) val, precision, scala);
- case VARCHAR:
- return StringData.fromString((String) val);
- default:
- return val;
- }
- }
-
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return cachedStatistics;
@@ -249,7 +230,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
public DorisRowDataInputFormat build() {
return new DorisRowDataInputFormat(
- optionsBuilder.build(), partitions, readOptions, rowType
+ optionsBuilder.build(), partitions, readOptions, rowType
);
}
}
diff --git a/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
deleted file mode 100644
index 06df2ef..0000000
--- a/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.datastream
-
-import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.doris.flink.backend.BackendClient
-import org.apache.doris.flink.cfg.ConfigurationOptions._
-import org.apache.doris.flink.cfg.{DorisOptions, DorisReadOptions}
-import org.apache.doris.flink.exception.ShouldNeverHappenException
-import org.apache.doris.flink.rest.{PartitionDefinition, SchemaUtils}
-import org.apache.doris.flink.rest.models.Schema
-import org.apache.doris.flink.serialization.{Routing, RowBatch}
-import org.apache.doris.flink.util.ErrorMessages
-import org.apache.doris.flink.util.ErrorMessages._
-import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult}
-import org.apache.log4j.Logger
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-import scala.util.control.Breaks
-
-/**
- * read data from Doris BE to array.
- * @param partition Doris RDD partition
- * @param options request configuration
- */
-class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, readOptions: DorisReadOptions) extends AutoCloseable {
- protected val logger = Logger.getLogger(classOf[ScalaValueReader])
-
- protected val client = new BackendClient(new Routing(partition.getBeAddress), readOptions)
- protected var offset = 0
- protected var eos: AtomicBoolean = new AtomicBoolean(false)
- protected var rowBatch: RowBatch = _
- // flag indicate if support deserialize Arrow to RowBatch asynchronously
- protected var deserializeArrowToRowBatchAsync: java.lang.Boolean = Try {
- if(readOptions.getDeserializeArrowAsync == null ) DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT else readOptions.getDeserializeArrowAsync
- } getOrElse {
- logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, readOptions.getDeserializeArrowAsync)
- DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT
- }
-
- protected var rowBatchBlockingQueue: BlockingQueue[RowBatch] = {
- val blockingQueueSize = Try {
- if(readOptions.getDeserializeQueueSize == null) DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT else readOptions.getDeserializeQueueSize
- } getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, readOptions.getDeserializeQueueSize)
- DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT
- }
-
- var queue: BlockingQueue[RowBatch] = null
- if (deserializeArrowToRowBatchAsync) {
- queue = new ArrayBlockingQueue(blockingQueueSize)
- }
- queue
- }
-
- private val openParams: TScanOpenParams = {
- val params = new TScanOpenParams
- params.cluster = DORIS_DEFAULT_CLUSTER
- params.database = partition.getDatabase
- params.table = partition.getTable
-
- params.tablet_ids = partition.getTabletIds.toList
- params.opaqued_query_plan = partition.getQueryPlan
-
- // max row number of one read batch
- val batchSize = Try {
- if(readOptions.getRequestBatchSize == null) DORIS_BATCH_SIZE_DEFAULT else readOptions.getRequestBatchSize;
- } getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, readOptions.getRequestBatchSize)
- DORIS_BATCH_SIZE_DEFAULT
- }
-
- val queryDorisTimeout = Try {
- if(readOptions.getRequestQueryTimeoutS == null) DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT else readOptions.getRequestQueryTimeoutS
- } getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, readOptions.getRequestQueryTimeoutS)
- DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
- }
-
- val execMemLimit = Try {
- if(readOptions.getExecMemLimit == null) DORIS_EXEC_MEM_LIMIT_DEFAULT else readOptions.getExecMemLimit
- } getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, readOptions.getExecMemLimit)
- DORIS_EXEC_MEM_LIMIT_DEFAULT
- }
-
- params.setBatchSize(batchSize)
- params.setQueryTimeout(queryDorisTimeout)
- params.setMemLimit(execMemLimit)
- params.setUser(options.getUsername)
- params.setPasswd(options.getPassword)
-
- logger.debug(s"Open scan params is, " +
- s"cluster: ${params.getCluster}, " +
- s"database: ${params.getDatabase}, " +
- s"table: ${params.getTable}, " +
- s"tabletId: ${params.getTabletIds}, " +
- s"batch size: $batchSize, " +
- s"query timeout: $queryDorisTimeout, " +
- s"execution memory limit: $execMemLimit, " +
- s"user: ${params.getUser}, " +
- s"query plan: ${params.getOpaquedQueryPlan}")
-
- params
- }
-
- protected val openResult: TScanOpenResult = client.openScanner(openParams)
- protected val contextId: String = openResult.getContextId
- protected val schema: Schema =
- SchemaUtils.convertToSchema(openResult.getSelectedColumns)
-
- protected val asyncThread: Thread = new Thread {
- override def run {
- val nextBatchParams = new TScanNextBatchParams
- nextBatchParams.setContextId(contextId)
- while (!eos.get) {
- nextBatchParams.setOffset(offset)
- val nextResult = client.getNext(nextBatchParams)
- eos.set(nextResult.isEos)
- if (!eos.get) {
- val rowBatch = new RowBatch(nextResult, schema).readArrow()
- offset += rowBatch.getReadRowCount
- rowBatch.close
- rowBatchBlockingQueue.put(rowBatch)
- }
- }
- }
- }
-
- protected val asyncThreadStarted: Boolean = {
- var started = false
- if (deserializeArrowToRowBatchAsync) {
- asyncThread.start
- started = true
- }
- started
- }
-
- logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.")
-
- /**
- * read data and cached in rowBatch.
- * @return true if hax next value
- */
- def hasNext: Boolean = {
- var hasNext = false
- if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
- // support deserialize Arrow to RowBatch asynchronously
- if (rowBatch == null || !rowBatch.hasNext) {
- val loop = new Breaks
- loop.breakable {
- while (!eos.get || !rowBatchBlockingQueue.isEmpty) {
- if (!rowBatchBlockingQueue.isEmpty) {
- rowBatch = rowBatchBlockingQueue.take
- hasNext = true
- loop.break
- } else {
- // wait for rowBatch put in queue or eos change
- Thread.sleep(5)
- }
- }
- }
- } else {
- hasNext = true
- }
- } else {
- // Arrow data was acquired synchronously during the iterative process
- if (!eos.get && (rowBatch == null || !rowBatch.hasNext)) {
- if (rowBatch != null) {
- offset += rowBatch.getReadRowCount
- rowBatch.close
- }
- val nextBatchParams = new TScanNextBatchParams
- nextBatchParams.setContextId(contextId)
- nextBatchParams.setOffset(offset)
- val nextResult = client.getNext(nextBatchParams)
- eos.set(nextResult.isEos)
- if (!eos.get) {
- rowBatch = new RowBatch(nextResult, schema).readArrow()
- }
- }
- hasNext = !eos.get
- }
- hasNext
- }
-
- /**
- * get next value.
- * @return next value
- */
- def next: java.util.List[_] = {
- if (!hasNext) {
- logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
- throw new ShouldNeverHappenException
- }
- rowBatch.next
- }
-
- def close(): Unit = {
- val closeParams = new TScanCloseParams
- closeParams.setContextId(contextId)
- client.closeScanner(closeParams)
- }
-
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
index 60524c8..dae75f0 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
@@ -24,7 +24,6 @@ public class DorisSourceSinkExample {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 261acbe..8e7368c 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -41,8 +41,6 @@ import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
-import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.table.data.DecimalData;
import org.junit.Assert;
import org.junit.Rule;
@@ -55,6 +53,7 @@ import java.io.ByteArrayOutputStream;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
@@ -70,7 +69,7 @@ public class TestRowBatch {
@Test
public void testRowBatch() throws Exception {
// schema
- ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+ List<Field> childrenBuilder = new ArrayList<>();
childrenBuilder.add(new Field("k0", FieldType.nullable(new ArrowType.Bool()), null));
childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Int(8, true)), null));
childrenBuilder.add(new Field("k2", FieldType.nullable(new ArrowType.Int(16, true)), null));
@@ -84,7 +83,7 @@ public class TestRowBatch {
childrenBuilder.add(new Field("k6", FieldType.nullable(new ArrowType.Utf8()), null));
VectorSchemaRoot root = VectorSchemaRoot.create(
- new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+ new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
new RootAllocator(Integer.MAX_VALUE));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
@@ -241,7 +240,7 @@ public class TestRowBatch {
RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
- List<Object> expectedRow1 = Lists.newArrayList(
+ List<Object> expectedRow1 = Arrays.asList(
Boolean.TRUE,
(byte) 1,
(short) 1,
@@ -310,11 +309,11 @@ public class TestRowBatch {
byte[] binaryRow1 = {'d', 'e', 'f'};
byte[] binaryRow2 = {'g', 'h', 'i'};
- ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+ List <Field> childrenBuilder = new ArrayList<>();
childrenBuilder.add(new Field("k7", FieldType.nullable(new ArrowType.Binary()), null));
VectorSchemaRoot root = VectorSchemaRoot.create(
- new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+ new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
new RootAllocator(Integer.MAX_VALUE));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
@@ -378,11 +377,11 @@ public class TestRowBatch {
@Test
public void testDecimalV2() throws Exception {
- ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+ List<Field> childrenBuilder = new ArrayList<>();
childrenBuilder.add(new Field("k7", FieldType.nullable(new ArrowType.Decimal(27, 9)), null));
VectorSchemaRoot root = VectorSchemaRoot.create(
- new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+ new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
new RootAllocator(Integer.MAX_VALUE));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org