You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/04/10 01:33:06 UTC
[doris-flink-connector] branch master updated: fix source when query plan is too long (#128)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5c32479 fix source when query plan is too long (#128)
5c32479 is described below
commit 5c32479f128ed057a0507a6ec9f263069afa5a4b
Author: wudi <67...@qq.com>
AuthorDate: Mon Apr 10 09:32:59 2023 +0800
fix source when query plan is too long (#128)
Co-authored-by: wudi <>
---
.../flink/source/split/DorisSourceSplitSerializer.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
index d667ed6..6348686 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -76,7 +77,10 @@ public class DorisSourceSplitSerializer
out.writeUTF(partDef.getTable());
out.writeUTF(partDef.getBeAddress());
writeLongArray(out, partDef.getTabletIds().toArray(new Long[]{}));
- out.writeUTF(partDef.getQueryPlan());
+ //writeUTF has a length limit, but the query plan is sometimes very long
+ final byte[] queryPlanBytes = partDef.getQueryPlan().getBytes(StandardCharsets.UTF_8);
+ out.writeInt(queryPlanBytes.length);
+ out.write(queryPlanBytes);
final byte[] result = out.getCopyOfBuffer();
out.clear();
@@ -103,7 +107,12 @@ public class DorisSourceSplitSerializer
final String beAddress = in.readUTF();
Long[] vals = readLongArray(in);
final Set<Long> tabletIds = new HashSet<>(Arrays.asList(vals));
- final String queryPlan = in.readUTF();
+
+ //read query plan
+ final int len = in.readInt();
+ final byte[] bytes = new byte[len];
+ in.read(bytes);
+ final String queryPlan = new String(bytes, StandardCharsets.UTF_8);
PartitionDefinition partDef = new PartitionDefinition(database, table, beAddress, tabletIds, queryPlan);
return new DorisSourceSplit(partDef);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org