You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/07/21 11:54:41 UTC

[doris-flink-connector] branch master updated: [Feature] Support Flink 1.15 (#49)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7171929  [Feature] Support Flink 1.15 (#49)
7171929 is described below

commit 7171929fb4ab64d96e75ad3f58752e64a1a75f11
Author: wudi <67...@qq.com>
AuthorDate: Thu Jul 21 19:54:36 2022 +0800

    [Feature] Support Flink 1.15 (#49)
    
    * flink 1.15 support
---
 .github/workflows/build-extension.yml              |   4 +-
 flink-doris-connector/pom.xml                      | 134 ++----------
 .../apache/doris/flink/backend/BackendClient.java  |   9 +-
 .../doris/flink/cfg/DorisExecutionOptions.java     |   2 +-
 .../flink/datastream/DorisSourceFunction.java      |  14 +-
 .../flink/exception/ConnectedFailedException.java  |   2 +-
 .../flink/exception/DorisInternalException.java    |   2 +-
 .../exception/ShouldNeverHappenException.java      |   2 +-
 .../org/apache/doris/flink/rest/RestService.java   |  12 +-
 .../apache/doris/flink/serialization/RowBatch.java |   7 +-
 .../source/reader/DorisSourceSplitReader.java      |  25 ++-
 .../flink/source/reader/DorisValueReader.java      | 229 +++++++++++++++++++++
 .../flink/source/split/DorisSplitRecords.java      |  18 +-
 .../flink/table/DorisDynamicTableFactory.java      |   2 +-
 .../doris/flink/table/DorisRowDataInputFormat.java |  33 +--
 .../doris/flink/datastream/ScalaValueReader.scala  | 222 --------------------
 .../apache/doris/flink/DorisSourceSinkExample.java |   1 -
 .../doris/flink/serialization/TestRowBatch.java    |  17 +-
 18 files changed, 310 insertions(+), 425 deletions(-)

diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-extension.yml
index b2ad6af..bc0c47c 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-extension.yml
@@ -46,7 +46,7 @@ jobs:
         touch custom_env.sh
         echo 'export THRIFT_BIN=/usr/bin/thrift' >> custom_env.sh
 
-    - name: Build flink connector 1.14
+    - name: Build flink connector 1.15
       run: |
-        cd flink-doris-connector/ && /bin/bash build.sh --flink 1.14.3 --scala 2.12
+        cd flink-doris-connector/ && /bin/bash build.sh --flink 1.15.0
 
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index ddcd3de..27b6df1 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -26,7 +26,7 @@ under the License.
         <version>23</version>
     </parent>
     <groupId>org.apache.doris</groupId>
-    <artifactId>flink-doris-connector-${flink.minor.version}_${scala.version}</artifactId>
+    <artifactId>flink-doris-connector-${flink.minor.version}</artifactId>
     <version>1.0.0-SNAPSHOT</version>
     <name>Flink Doris Connector</name>
     <url>https://doris.apache.org/</url>
@@ -62,7 +62,6 @@ under the License.
         </mailingList>
     </mailingLists>
     <properties>
-        <scala.version>${env.scala.version}</scala.version>
         <flink.version>${env.flink.version}</flink.version>
         <flink.minor.version>${env.flink.minor.version}</flink.minor.version>
         <libthrift.version>0.13.0</libthrift.version>
@@ -111,16 +110,7 @@ under the License.
         <profile>
             <id>flink.version</id>
             <properties>
-                <env.flink.version>1.14.3</env.flink.version>
-            </properties>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
-        </profile>
-        <profile>
-            <id>scala.version</id>
-            <properties>
-                <env.scala.version>2.12</env.scala.version>
+                <env.flink.version>1.15.0</env.flink.version>
             </properties>
             <activation>
                 <activeByDefault>true</activeByDefault>
@@ -147,26 +137,31 @@ under the License.
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-loader</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.version}</artifactId>
+            <artifactId>flink-connector-base</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_${scala.version}</artifactId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
-        <!--    flink table -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_${scala.version}</artifactId>
+            <artifactId>flink-table-runtime</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -235,6 +230,7 @@ under the License.
             <artifactId>netty-common</artifactId>
             <version>4.1.77.Final</version>
         </dependency>
+        <!--  jackson  -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>
@@ -250,35 +246,6 @@ under the License.
             <artifactId>jackson-databind</artifactId>
             <version>2.13.3</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-web</artifactId>
-            <version>${log4j2.version}</version>
-        </dependency>
-        <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-api</artifactId>
-            <version>${log4j2.version}</version>
-        </dependency>
-        <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-core</artifactId>
-            <version>${log4j2.version}</version>
-        </dependency>
-        <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl -->
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-slf4j-impl</artifactId>
-            <version>${log4j2.version}</version>
-        </dependency>
-        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.9</version>
-        </dependency>
         <!--Test-->
         <dependency>
             <groupId>org.hamcrest</groupId>
@@ -288,26 +255,14 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-scala_${scala.version}</artifactId>
-            <version>1.4.7</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>hamcrest-core</artifactId>
-                    <groupId>org.hamcrest</groupId>
-                </exclusion>
-            </exclusions>
+            <artifactId>mockito-core</artifactId>
+            <version>2.27.0</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.11</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>hamcrest-core</artifactId>
-                    <groupId>org.hamcrest</groupId>
-                </exclusion>
-            </exclusions>
             <scope>test</scope>
         </dependency>
     </dependencies>
@@ -344,32 +299,6 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-                <version>3.2.1</version>
-                <executions>
-                    <execution>
-                        <id>scala-compile-first</id>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-test-compile</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <args>
-                        <arg>-feature</arg>
-                    </args>
-                </configuration>
-            </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
@@ -417,39 +346,6 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
-            <!--
-            <plugin>
-                <groupId>org.jacoco</groupId>
-                <artifactId>jacoco-maven-plugin</artifactId>
-                <version>0.7.8</version>
-                <configuration>
-                    <excludes>
-                        <exclude>**/thrift/**</exclude>
-                    </excludes>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>prepare-agent</id>
-                        <goals>
-                            <goal>prepare-agent</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>check</id>
-                        <goals>
-                            <goal>check</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>report</id>
-                        <phase>test</phase>
-                        <goals>
-                            <goal>report</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
index 9b8d955..f55d148 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.backend;
 import org.apache.doris.flink.cfg.ConfigurationOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.ConnectedFailedException;
-import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.DorisInternalException;
 import org.apache.doris.flink.serialization.Routing;
 import org.apache.doris.flink.util.ErrorMessages;
@@ -57,7 +56,7 @@ public class BackendClient {
     private final int socketTimeout;
     private final int connectTimeout;
 
-    public BackendClient(Routing routing, DorisReadOptions readOptions) throws ConnectedFailedException {
+    public BackendClient(Routing routing, DorisReadOptions readOptions) {
         this.routing = routing;
         this.connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs();
         this.socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs();
@@ -67,7 +66,7 @@ public class BackendClient {
         open();
     }
 
-    private void open() throws ConnectedFailedException {
+    private void open() {
         logger.debug("Open client to Doris BE '{}'.", routing);
         TException ex = null;
         for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
@@ -117,7 +116,7 @@ public class BackendClient {
      * @return scan open result
      * @throws ConnectedFailedException throw if cannot connect to Doris BE
      */
-    public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedFailedException {
+    public TScanOpenResult openScanner(TScanOpenParams openParams) {
         logger.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams);
         if (!isConnected) {
             open();
@@ -153,7 +152,7 @@ public class BackendClient {
      * @return scan batch result
      * @throws ConnectedFailedException throw if cannot connect to Doris BE
      */
-    public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws DorisException {
+    public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) {
         logger.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams);
         if (!isConnected) {
             open();
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 2daf5e1..102a7ee 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -73,7 +73,7 @@ public class DorisExecutionOptions implements Serializable {
     public static DorisExecutionOptions defaults() {
         Properties properties = new Properties();
         properties.setProperty("format", "json");
-        properties.setProperty("strip_outer_array", "true");
+        properties.setProperty("read_json_by_line", "true");
         return new Builder().setStreamLoadProp(properties).build();
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 1957139..fc5fd14 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -23,18 +23,18 @@ import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.rest.PartitionDefinition;
 import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.source.reader.DorisValueReader;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 
 
-
 /**
  * DorisSource
  **/
@@ -48,7 +48,7 @@ public class DorisSourceFunction extends RichParallelSourceFunction<List<?>> imp
     private final DorisReadOptions readOptions;
     private transient volatile boolean isRunning;
     private List<PartitionDefinition> dorisPartitions;
-    private List<PartitionDefinition> taskDorisPartitions = Lists.newArrayList();
+    private List<PartitionDefinition> taskDorisPartitions = new ArrayList<>();
 
     public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) {
         this.deserializer = deserializer;
@@ -87,11 +87,13 @@ public class DorisSourceFunction extends RichParallelSourceFunction<List<?>> imp
     @Override
     public void run(SourceContext<List<?>> sourceContext) {
         for (PartitionDefinition partitions : taskDorisPartitions) {
-            try (ScalaValueReader scalaValueReader = new ScalaValueReader(partitions, options, readOptions)) {
-                while (isRunning && scalaValueReader.hasNext()) {
-                    List<?> next = scalaValueReader.next();
+            try (DorisValueReader valueReader = new DorisValueReader(partitions, options, readOptions)) {
+                while (isRunning && valueReader.hasNext()) {
+                    List<?> next = valueReader.next();
                     sourceContext.collect(next);
                 }
+            } catch (Exception e) {
+                logger.error("close reader resource failed,", e);
             }
         }
     }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
index e25d1a5..6f755b7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.flink.exception;
 
-public class ConnectedFailedException extends DorisException {
+public class ConnectedFailedException extends DorisRuntimeException {
     public ConnectedFailedException(String server, Throwable cause) {
         super("Connect to " + server + "failed.", cause);
     }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java
index eadd860..e6756a4 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java
@@ -21,7 +21,7 @@ import org.apache.doris.thrift.TStatusCode;
 
 import java.util.List;
 
-public class DorisInternalException extends DorisException {
+public class DorisInternalException extends DorisRuntimeException {
     public DorisInternalException(String server, TStatusCode statusCode, List<String> errorMsgs) {
         super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs);
     }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
index a26718d..81af673 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
@@ -17,5 +17,5 @@
 
 package org.apache.doris.flink.exception;
 
-public class ShouldNeverHappenException extends DorisException {
+public class ShouldNeverHappenException extends DorisRuntimeException {
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 734bfdb..ff03e01 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -20,16 +20,15 @@ package org.apache.doris.flink.rest;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.commons.io.IOUtils;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisRuntimeException;
-import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.cfg.ConfigurationOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.ConnectedFailedException;
 import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.exception.ShouldNeverHappenException;
 import org.apache.doris.flink.rest.models.Backend;
 import org.apache.doris.flink.rest.models.BackendRow;
@@ -37,14 +36,13 @@ import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.rest.models.QueryPlan;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.rest.models.Tablet;
-import org.apache.flink.calcite.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.StringEntity;
-
 import org.slf4j.Logger;
 
 import java.io.BufferedReader;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 4dd6732..e83300a 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -33,6 +33,7 @@ import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.types.Types;
 import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.thrift.TScanBatchResult;
 import org.apache.flink.util.Preconditions;
@@ -99,7 +100,7 @@ public class RowBatch {
         this.offsetInRowBatch = 0;
     }
 
-    public RowBatch readArrow() throws DorisException {
+    public RowBatch readArrow() {
         try {
             this.root = arrowStreamReader.getVectorSchemaRoot();
             while (arrowStreamReader.loadNextBatch()) {
@@ -124,7 +125,7 @@ public class RowBatch {
             return this;
         } catch (Exception e) {
             logger.error("Read Doris Data failed because: ", e);
-            throw new DorisException(e.getMessage());
+            throw new DorisRuntimeException(e.getMessage());
         } finally {
             close();
         }
@@ -304,7 +305,7 @@ public class RowBatch {
         }
     }
 
-    public List<Object> next() throws DorisException {
+    public List<Object> next() {
         if (!hasNext()) {
             String errMsg = "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount;
             logger.error(errMsg);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
index aadf6fd..c5d33f7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
@@ -18,7 +18,6 @@ package org.apache.doris.flink.source.reader;
 
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.datastream.ScalaValueReader;
 import org.apache.doris.flink.source.split.DorisSourceSplit;
 import org.apache.doris.flink.source.split.DorisSplitRecords;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -43,7 +42,7 @@ public class DorisSourceSplitReader
     private final Queue<DorisSourceSplit> splits;
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
-    private ScalaValueReader scalaValueReader;
+    private DorisValueReader valueReader;
     private String currentSplitId;
 
     public DorisSourceSplitReader(DorisOptions options, DorisReadOptions readOptions) {
@@ -56,14 +55,14 @@ public class DorisSourceSplitReader
     public RecordsWithSplitIds<List> fetch() throws IOException {
         checkSplitOrStartNext();
 
-        if (!scalaValueReader.hasNext()) {
+        if (!valueReader.hasNext()) {
             return finishSplit();
         }
-        return DorisSplitRecords.forRecords(currentSplitId, scalaValueReader);
+        return DorisSplitRecords.forRecords(currentSplitId, valueReader);
     }
 
     private void checkSplitOrStartNext() throws IOException {
-        if (scalaValueReader != null) {
+        if (valueReader != null) {
             return;
         }
         final DorisSourceSplit nextSplit = splits.poll();
@@ -71,13 +70,17 @@ public class DorisSourceSplitReader
             throw new IOException("Cannot fetch from another split - no split remaining");
         }
         currentSplitId = nextSplit.splitId();
-        scalaValueReader = new ScalaValueReader(nextSplit.getPartitionDefinition(), options, readOptions);
+        valueReader = new DorisValueReader(nextSplit.getPartitionDefinition(), options, readOptions);
     }
 
     private DorisSplitRecords finishSplit() {
-        if (scalaValueReader != null) {
-            scalaValueReader.close();
-            scalaValueReader = null;
+        if (valueReader != null) {
+            try {
+                valueReader.close();
+            } catch (Exception e) {
+                LOG.error("close resource reader failed,", e);
+            }
+            valueReader = null;
         }
         final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId);
         currentSplitId = null;
@@ -96,8 +99,8 @@ public class DorisSourceSplitReader
 
     @Override
     public void close() throws Exception {
-        if (scalaValueReader != null) {
-            scalaValueReader.close();
+        if (valueReader != null) {
+            valueReader.close();
         }
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
new file mode 100644
index 0000000..173ea90
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.backend.BackendClient;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.exception.ShouldNeverHappenException;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.SchemaUtils;
+import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.serialization.Routing;
+import org.apache.doris.flink.serialization.RowBatch;
+import org.apache.doris.thrift.TScanBatchResult;
+import org.apache.doris.thrift.TScanCloseParams;
+import org.apache.doris.thrift.TScanNextBatchParams;
+import org.apache.doris.thrift.TScanOpenParams;
+import org.apache.doris.thrift.TScanOpenResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
+import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
+
+public class DorisValueReader implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(DorisValueReader.class);
+    protected BackendClient client;
+    private PartitionDefinition partition;
+    private DorisOptions options;
+    private DorisReadOptions readOptions;
+
+    protected int offset = 0;
+    protected AtomicBoolean eos = new AtomicBoolean(false);
+    protected RowBatch rowBatch;
+
+    // flag indicate if support deserialize Arrow to RowBatch asynchronously
+    protected Boolean deserializeArrowToRowBatchAsync;
+
+    protected BlockingQueue<RowBatch> rowBatchBlockingQueue;
+    private TScanOpenParams openParams;
+    protected String contextId;
+    protected Schema schema;
+    protected boolean asyncThreadStarted;
+
+    public DorisValueReader(PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions) {
+        this.partition = partition;
+        this.options = options;
+        this.readOptions = readOptions;
+        this.client = backendClient();
+        this.deserializeArrowToRowBatchAsync = readOptions.getDeserializeArrowAsync() == null ? DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT : readOptions.getDeserializeArrowAsync();
+
+        Integer blockingQueueSize = readOptions.getDeserializeQueueSize() == null ? DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT : readOptions.getDeserializeQueueSize();
+        if (this.deserializeArrowToRowBatchAsync) {
+            this.rowBatchBlockingQueue = new ArrayBlockingQueue(blockingQueueSize);
+        }
+        init();
+    }
+
+    private void init() {
+        this.openParams = openParams();
+        TScanOpenResult openResult = this.client.openScanner(this.openParams);
+        this.contextId = openResult.getContextId();
+        this.schema = SchemaUtils.convertToSchema(openResult.getSelectedColumns());
+        this.asyncThreadStarted = asyncThreadStarted();
+        LOG.debug("Open scan result is, contextId: {}, schema: {}.", contextId, schema);
+    }
+
+    private BackendClient backendClient() {
+        try {
+            return new BackendClient(new Routing(partition.getBeAddress()), readOptions);
+        } catch (IllegalArgumentException e) {
+            LOG.error("init backend:{} client failed,", partition.getBeAddress(), e);
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    private TScanOpenParams openParams() {
+        TScanOpenParams params = new TScanOpenParams();
+        params.cluster = DORIS_DEFAULT_CLUSTER;
+        params.database = partition.getDatabase();
+        params.table = partition.getTable();
+
+        params.tablet_ids = Arrays.asList(partition.getTabletIds().toArray(new Long[]{}));
+        params.opaqued_query_plan = partition.getQueryPlan();
+        // max row number of one read batch
+        Integer batchSize = readOptions.getRequestBatchSize() == null ? DORIS_BATCH_SIZE_DEFAULT : readOptions.getRequestBatchSize();
+        Integer queryDorisTimeout = readOptions.getRequestQueryTimeoutS() == null ? DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT : readOptions.getRequestQueryTimeoutS();
+        Long execMemLimit = readOptions.getExecMemLimit() == null ? DORIS_EXEC_MEM_LIMIT_DEFAULT : readOptions.getExecMemLimit();
+        params.setBatchSize(batchSize);
+        params.setQueryTimeout(queryDorisTimeout);
+        params.setMemLimit(execMemLimit);
+        params.setUser(options.getUsername());
+        params.setPasswd(options.getPassword());
+        LOG.debug("Open scan params is,cluster:{},database:{},table:{},tabletId:{},batch size:{},query timeout:{},execution memory limit:{},user:{},query plan: {}",
+                params.getCluster(), params.getDatabase(), params.getTable(), params.getTabletIds(), params.getBatchSize(), params.getQueryTimeout(), params.getMemLimit(), params.getUser(), params.getOpaquedQueryPlan());
+        return params;
+    }
+
+    protected Thread asyncThread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+            TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
+            nextBatchParams.setContextId(contextId);
+            while (!eos.get()) {
+                nextBatchParams.setOffset(offset);
+                TScanBatchResult nextResult = client.getNext(nextBatchParams);
+                eos.set(nextResult.isEos());
+                if (!eos.get()) {
+                    RowBatch rowBatch = new RowBatch(nextResult, schema).readArrow();
+                    offset += rowBatch.getReadRowCount();
+                    rowBatch.close();
+                    try {
+                        rowBatchBlockingQueue.put(rowBatch);
+                    } catch (InterruptedException e) {
+                        throw new DorisRuntimeException(e);
+                    }
+                }
+            }
+        }
+    });
+
+    protected boolean asyncThreadStarted() {
+        boolean started = false;
+        if (deserializeArrowToRowBatchAsync) {
+            asyncThread.start();
+            started = true;
+        }
+        return started;
+    }
+
+    /**
+     * read data and cached in rowBatch.
+     *
+     * @return true if hax next value
+     */
+    public boolean hasNext() {
+        boolean hasNext = false;
+        if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
+            // support deserialize Arrow to RowBatch asynchronously
+            if (rowBatch == null || !rowBatch.hasNext()) {
+                while (!eos.get() || !rowBatchBlockingQueue.isEmpty()) {
+                    if (!rowBatchBlockingQueue.isEmpty()) {
+                        try {
+                            rowBatch = rowBatchBlockingQueue.take();
+                        } catch (InterruptedException e) {
+                            throw new DorisRuntimeException(e);
+                        }
+                        hasNext = true;
+                        break;
+                    } else {
+                        // wait for rowBatch put in queue or eos change
+                        try {
+                            Thread.sleep(5);
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                }
+            } else {
+                hasNext = true;
+            }
+        } else {
+            // Arrow data was acquired synchronously during the iterative process
+            if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
+                if (rowBatch != null) {
+                    offset += rowBatch.getReadRowCount();
+                    rowBatch.close();
+                }
+                TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
+                nextBatchParams.setContextId(contextId);
+                nextBatchParams.setOffset(offset);
+                TScanBatchResult nextResult = client.getNext(nextBatchParams);
+                eos.set(nextResult.isEos());
+                if (!eos.get()) {
+                    rowBatch = new RowBatch(nextResult, schema).readArrow();
+                }
+            }
+            hasNext = !eos.get();
+        }
+        return hasNext;
+    }
+
+    /**
+     * get next value.
+     *
+     * @return next value
+     */
+    public List next() {
+        if (!hasNext()) {
+            LOG.error(SHOULD_NOT_HAPPEN_MESSAGE);
+            throw new ShouldNeverHappenException();
+        }
+        return rowBatch.next();
+    }
+
+    @Override
+    public void close() throws Exception {
+        TScanCloseParams closeParams = new TScanCloseParams();
+        closeParams.setContextId(contextId);
+        client.closeScanner(closeParams);
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
index 6f02446..9d1bbd7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
@@ -16,7 +16,7 @@
 // under the License.
 package org.apache.doris.flink.source.split;
 
-import org.apache.doris.flink.datastream.ScalaValueReader;
+import org.apache.doris.flink.source.reader.DorisValueReader;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
 import javax.annotation.Nullable;
@@ -26,25 +26,25 @@ import java.util.Set;
 
 /**
  * An implementation of {@link RecordsWithSplitIds}.
- * This is essentially a slim wrapper around the {@link ScalaValueReader} that only adds
+ * This is essentially a slim wrapper around the {@link DorisValueReader} that only adds
  * information about the current split, or finished splits
  */
 public class DorisSplitRecords implements RecordsWithSplitIds<List> {
 
     private final Set<String> finishedSplits;
-    private final ScalaValueReader scalaValueReader;
+    private final DorisValueReader valueReader;
     private String splitId;
 
     public DorisSplitRecords(String splitId,
-                             ScalaValueReader scalaValueReader,
+                             DorisValueReader valueReader,
                              Set<String> finishedSplits) {
         this.splitId = splitId;
-        this.scalaValueReader = scalaValueReader;
+        this.valueReader = valueReader;
         this.finishedSplits = finishedSplits;
     }
 
     public static DorisSplitRecords forRecords(
-            final String splitId, final ScalaValueReader valueReader) {
+            final String splitId, final DorisValueReader valueReader) {
         return new DorisSplitRecords(splitId, valueReader, Collections.emptySet());
     }
 
@@ -58,7 +58,7 @@ public class DorisSplitRecords implements RecordsWithSplitIds<List> {
         // move the split one (from current value to null)
         final String nextSplit = this.splitId;
         this.splitId = null;
-        if (scalaValueReader == null || !scalaValueReader.hasNext()) {
+        if (valueReader == null || !valueReader.hasNext()) {
             return null;
         }
         return nextSplit;
@@ -67,8 +67,8 @@ public class DorisSplitRecords implements RecordsWithSplitIds<List> {
     @Nullable
     @Override
     public List nextRecordFromSplit() {
-        if (scalaValueReader != null && scalaValueReader.hasNext()) {
-            List next = scalaValueReader.next();
+        if (valueReader != null && valueReader.hasNext()) {
+            List next = valueReader.next();
             return next;
         }
         return null;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index be00cff..fb44359 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -147,7 +147,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
     private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
             .key("sink.label-prefix")
             .stringType()
-            .noDefaultValue()
+            .defaultValue("")
             .withDescription("the unique label prefix.");
     private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
             .key("sink.batch.interval")
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index 16cf2ee..7181ce6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -18,9 +18,9 @@ package org.apache.doris.flink.table;
 
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.datastream.ScalaValueReader;
 import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
 import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.source.reader.DorisValueReader;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
@@ -30,17 +30,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.sql.PreparedStatement;
 import java.util.ArrayList;
 import java.util.List;
@@ -59,7 +54,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
     private List<PartitionDefinition> dorisPartitions;
     private TypeInformation<RowData> rowDataTypeInfo;
 
-    private ScalaValueReader scalaValueReader;
+    private DorisValueReader valueReader;
     private transient boolean hasNext;
 
     private final DorisRowConverter rowConverter;
@@ -105,8 +100,8 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
      */
     @Override
     public void open(DorisTableInputSplit inputSplit) throws IOException {
-        scalaValueReader = new ScalaValueReader(inputSplit.partition, options, readOptions);
-        hasNext = scalaValueReader.hasNext();
+        valueReader = new DorisValueReader(inputSplit.partition, options, readOptions);
+        hasNext = valueReader.hasNext();
     }
 
     /**
@@ -147,27 +142,13 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
         if (!hasNext) {
             return null;
         }
-        List next = (List) scalaValueReader.next();
+        List next = valueReader.next();
         RowData genericRowData = rowConverter.convertInternal(next);
         //update hasNext after we've read the record
-        hasNext = scalaValueReader.hasNext();
+        hasNext = valueReader.hasNext();
         return genericRowData;
     }
 
-    private Object deserialize(LogicalType type, Object val) {
-        switch (type.getTypeRoot()) {
-            case DECIMAL:
-                final DecimalType decimalType = ((DecimalType) type);
-                final int precision = decimalType.getPrecision();
-                final int scala = decimalType.getScale();
-                return DecimalData.fromBigDecimal((BigDecimal) val, precision, scala);
-            case VARCHAR:
-                return StringData.fromString((String) val);
-            default:
-                return val;
-        }
-    }
-
     @Override
     public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
         return cachedStatistics;
@@ -249,7 +230,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
 
         public DorisRowDataInputFormat build() {
             return new DorisRowDataInputFormat(
-                optionsBuilder.build(), partitions, readOptions, rowType
+                    optionsBuilder.build(), partitions, readOptions, rowType
             );
         }
     }
diff --git a/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
deleted file mode 100644
index 06df2ef..0000000
--- a/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.datastream
-
-import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.doris.flink.backend.BackendClient
-import org.apache.doris.flink.cfg.ConfigurationOptions._
-import org.apache.doris.flink.cfg.{DorisOptions, DorisReadOptions}
-import org.apache.doris.flink.exception.ShouldNeverHappenException
-import org.apache.doris.flink.rest.{PartitionDefinition, SchemaUtils}
-import org.apache.doris.flink.rest.models.Schema
-import org.apache.doris.flink.serialization.{Routing, RowBatch}
-import org.apache.doris.flink.util.ErrorMessages
-import org.apache.doris.flink.util.ErrorMessages._
-import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult}
-import org.apache.log4j.Logger
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-import scala.util.control.Breaks
-
-/**
- * read data from Doris BE to array.
- * @param partition Doris RDD partition
- * @param options request configuration
- */
-class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, readOptions: DorisReadOptions) extends AutoCloseable {
-  protected val logger = Logger.getLogger(classOf[ScalaValueReader])
-
-  protected val client = new BackendClient(new Routing(partition.getBeAddress), readOptions)
-  protected var offset = 0
-  protected var eos: AtomicBoolean = new AtomicBoolean(false)
-  protected var rowBatch: RowBatch = _
-  // flag indicate if support deserialize Arrow to RowBatch asynchronously
-  protected var deserializeArrowToRowBatchAsync: java.lang.Boolean = Try {
-    if(readOptions.getDeserializeArrowAsync == null ) DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT else readOptions.getDeserializeArrowAsync
-  } getOrElse {
-    logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, readOptions.getDeserializeArrowAsync)
-    DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT
-  }
-
-  protected var rowBatchBlockingQueue: BlockingQueue[RowBatch] = {
-    val blockingQueueSize = Try {
-      if(readOptions.getDeserializeQueueSize == null) DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT else readOptions.getDeserializeQueueSize
-    } getOrElse {
-      logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, readOptions.getDeserializeQueueSize)
-      DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT
-    }
-
-    var queue: BlockingQueue[RowBatch] = null
-    if (deserializeArrowToRowBatchAsync) {
-      queue = new ArrayBlockingQueue(blockingQueueSize)
-    }
-    queue
-  }
-
-  private val openParams: TScanOpenParams = {
-    val params = new TScanOpenParams
-    params.cluster = DORIS_DEFAULT_CLUSTER
-    params.database = partition.getDatabase
-    params.table = partition.getTable
-
-    params.tablet_ids = partition.getTabletIds.toList
-    params.opaqued_query_plan = partition.getQueryPlan
-
-    // max row number of one read batch
-    val batchSize = Try {
-      if(readOptions.getRequestBatchSize == null)  DORIS_BATCH_SIZE_DEFAULT else readOptions.getRequestBatchSize;
-    } getOrElse {
-        logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, readOptions.getRequestBatchSize)
-        DORIS_BATCH_SIZE_DEFAULT
-    }
-
-    val queryDorisTimeout = Try {
-      if(readOptions.getRequestQueryTimeoutS == null) DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT else readOptions.getRequestQueryTimeoutS
-    } getOrElse {
-      logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, readOptions.getRequestQueryTimeoutS)
-      DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
-    }
-
-    val execMemLimit = Try {
-      if(readOptions.getExecMemLimit == null) DORIS_EXEC_MEM_LIMIT_DEFAULT else readOptions.getExecMemLimit
-    } getOrElse {
-      logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, readOptions.getExecMemLimit)
-      DORIS_EXEC_MEM_LIMIT_DEFAULT
-    }
-
-    params.setBatchSize(batchSize)
-    params.setQueryTimeout(queryDorisTimeout)
-    params.setMemLimit(execMemLimit)
-    params.setUser(options.getUsername)
-    params.setPasswd(options.getPassword)
-
-    logger.debug(s"Open scan params is, " +
-        s"cluster: ${params.getCluster}, " +
-        s"database: ${params.getDatabase}, " +
-        s"table: ${params.getTable}, " +
-        s"tabletId: ${params.getTabletIds}, " +
-        s"batch size: $batchSize, " +
-        s"query timeout: $queryDorisTimeout, " +
-        s"execution memory limit: $execMemLimit, " +
-        s"user: ${params.getUser}, " +
-        s"query plan: ${params.getOpaquedQueryPlan}")
-
-    params
-  }
-
-  protected val openResult: TScanOpenResult = client.openScanner(openParams)
-  protected val contextId: String = openResult.getContextId
-  protected val schema: Schema =
-    SchemaUtils.convertToSchema(openResult.getSelectedColumns)
-
-  protected val asyncThread: Thread = new Thread {
-    override def run {
-      val nextBatchParams = new TScanNextBatchParams
-      nextBatchParams.setContextId(contextId)
-      while (!eos.get) {
-        nextBatchParams.setOffset(offset)
-        val nextResult = client.getNext(nextBatchParams)
-        eos.set(nextResult.isEos)
-        if (!eos.get) {
-          val rowBatch = new RowBatch(nextResult, schema).readArrow()
-          offset += rowBatch.getReadRowCount
-          rowBatch.close
-          rowBatchBlockingQueue.put(rowBatch)
-        }
-      }
-    }
-  }
-
-  protected val asyncThreadStarted: Boolean = {
-    var started = false
-    if (deserializeArrowToRowBatchAsync) {
-      asyncThread.start
-      started = true
-    }
-    started
-  }
-
-  logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.")
-
-  /**
-   * read data and cached in rowBatch.
-   * @return true if hax next value
-   */
-  def hasNext: Boolean = {
-    var hasNext = false
-    if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
-      // support deserialize Arrow to RowBatch asynchronously
-      if (rowBatch == null || !rowBatch.hasNext) {
-        val loop = new Breaks
-        loop.breakable {
-          while (!eos.get || !rowBatchBlockingQueue.isEmpty) {
-            if (!rowBatchBlockingQueue.isEmpty) {
-              rowBatch = rowBatchBlockingQueue.take
-              hasNext = true
-              loop.break
-            } else {
-              // wait for rowBatch put in queue or eos change
-              Thread.sleep(5)
-            }
-          }
-        }
-      } else {
-        hasNext = true
-      }
-    } else {
-      // Arrow data was acquired synchronously during the iterative process
-      if (!eos.get && (rowBatch == null || !rowBatch.hasNext)) {
-        if (rowBatch != null) {
-          offset += rowBatch.getReadRowCount
-          rowBatch.close
-        }
-        val nextBatchParams = new TScanNextBatchParams
-        nextBatchParams.setContextId(contextId)
-        nextBatchParams.setOffset(offset)
-        val nextResult = client.getNext(nextBatchParams)
-        eos.set(nextResult.isEos)
-        if (!eos.get) {
-          rowBatch = new RowBatch(nextResult, schema).readArrow()
-        }
-      }
-      hasNext = !eos.get
-    }
-    hasNext
-  }
-
-  /**
-   * get next value.
-   * @return next value
-   */
-  def next: java.util.List[_] = {
-    if (!hasNext) {
-      logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
-      throw new ShouldNeverHappenException
-    }
-    rowBatch.next
-  }
-
-  def close(): Unit = {
-    val closeParams = new TScanCloseParams
-    closeParams.setContextId(contextId)
-    client.closeScanner(closeParams)
-  }
-
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
index 60524c8..dae75f0 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
@@ -24,7 +24,6 @@ public class DorisSourceSinkExample {
 
     public static void main(String[] args) {
         EnvironmentSettings settings = EnvironmentSettings.newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         TableEnvironment tEnv = TableEnvironment.create(settings);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 261acbe..8e7368c 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -41,8 +41,6 @@ import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.thrift.TScanBatchResult;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
-import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
 import org.apache.flink.table.data.DecimalData;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -55,6 +53,7 @@ import java.io.ByteArrayOutputStream;
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -70,7 +69,7 @@ public class TestRowBatch {
     @Test
     public void testRowBatch() throws Exception {
         // schema
-        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        List<Field> childrenBuilder = new ArrayList<>();
         childrenBuilder.add(new Field("k0", FieldType.nullable(new ArrowType.Bool()), null));
         childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Int(8, true)), null));
         childrenBuilder.add(new Field("k2", FieldType.nullable(new ArrowType.Int(16, true)), null));
@@ -84,7 +83,7 @@ public class TestRowBatch {
         childrenBuilder.add(new Field("k6", FieldType.nullable(new ArrowType.Utf8()), null));
 
         VectorSchemaRoot root = VectorSchemaRoot.create(
-                new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+                new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
                 new RootAllocator(Integer.MAX_VALUE));
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
@@ -241,7 +240,7 @@ public class TestRowBatch {
 
         RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
 
-        List<Object> expectedRow1 = Lists.newArrayList(
+        List<Object> expectedRow1 = Arrays.asList(
                 Boolean.TRUE,
                 (byte) 1,
                 (short) 1,
@@ -310,11 +309,11 @@ public class TestRowBatch {
         byte[] binaryRow1 = {'d', 'e', 'f'};
         byte[] binaryRow2 = {'g', 'h', 'i'};
 
-        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        List <Field> childrenBuilder = new ArrayList<>();
         childrenBuilder.add(new Field("k7", FieldType.nullable(new ArrowType.Binary()), null));
 
         VectorSchemaRoot root = VectorSchemaRoot.create(
-                new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+                new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
                 new RootAllocator(Integer.MAX_VALUE));
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
@@ -378,11 +377,11 @@ public class TestRowBatch {
 
     @Test
     public void testDecimalV2() throws Exception {
-        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        List<Field> childrenBuilder = new ArrayList<>();
         childrenBuilder.add(new Field("k7", FieldType.nullable(new ArrowType.Decimal(27, 9)), null));
 
         VectorSchemaRoot root = VectorSchemaRoot.create(
-                new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+                new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
                 new RootAllocator(Integer.MAX_VALUE));
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org