You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/04/10 01:33:06 UTC

[doris-flink-connector] branch master updated: fix source when query plan is too long (#128)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c32479  fix source when query plan is too long (#128)
5c32479 is described below

commit 5c32479f128ed057a0507a6ec9f263069afa5a4b
Author: wudi <67...@qq.com>
AuthorDate: Mon Apr 10 09:32:59 2023 +0800

    fix source when query plan is too long (#128)
    
    Co-authored-by: wudi <>
---
 .../flink/source/split/DorisSourceSplitSerializer.java      | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
index d667ed6..6348686 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
@@ -76,7 +77,10 @@ public class DorisSourceSplitSerializer
         out.writeUTF(partDef.getTable());
         out.writeUTF(partDef.getBeAddress());
         writeLongArray(out, partDef.getTabletIds().toArray(new Long[]{}));
-        out.writeUTF(partDef.getQueryPlan());
+        //writeUTF has a length limit, but the query plan is sometimes very long
+        final byte[] queryPlanBytes = partDef.getQueryPlan().getBytes(StandardCharsets.UTF_8);
+        out.writeInt(queryPlanBytes.length);
+        out.write(queryPlanBytes);
 
         final byte[] result = out.getCopyOfBuffer();
         out.clear();
@@ -103,7 +107,12 @@ public class DorisSourceSplitSerializer
         final String beAddress = in.readUTF();
         Long[] vals = readLongArray(in);
         final Set<Long> tabletIds = new HashSet<>(Arrays.asList(vals));
-        final String queryPlan = in.readUTF();
+
+        //read query plan
+        final int len = in.readInt();
+        final byte[] bytes = new byte[len];
+        in.read(bytes);
+        final String queryPlan = new String(bytes, StandardCharsets.UTF_8);
         PartitionDefinition partDef = new PartitionDefinition(database, table, beAddress, tabletIds, queryPlan);
         return new DorisSourceSplit(partDef);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org