You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/01/28 03:16:46 UTC
[doris-spark-connector] branch master updated: [improvement] Adapt to the load format of csv (#65)
This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 0bb76ed [improvement] Adapt to the load format of csv (#65)
0bb76ed is described below
commit 0bb76ed0480398583b2541596fece161ea402984
Author: Hong Liu <84...@qq.com>
AuthorDate: Sat Jan 28 11:16:40 2023 +0800
[improvement] Adapt to the load format of csv (#65)
Co-authored-by: smallhibiscus <844981280>
---
.../org/apache/doris/spark/DorisStreamLoad.java | 56 ++++++++++++++--------
.../doris/spark/sql/DorisSourceProvider.scala | 2 +-
2 files changed, 36 insertions(+), 22 deletions(-)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 351ef23..3ada398 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -50,10 +50,10 @@ import java.util.concurrent.TimeUnit;
/**
* DorisStreamLoad
**/
-public class DorisStreamLoad implements Serializable {
- public static final String FIELD_DELIMITER = "\t";
- public static final String LINE_DELIMITER = "\n";
- public static final String NULL_VALUE = "\\N";
+public class DorisStreamLoad implements Serializable{
+ private String FIELD_DELIMITER;
+ private String LINE_DELIMITER;
+ private String NULL_VALUE = "\\N";
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
@@ -71,6 +71,7 @@ public class DorisStreamLoad implements Serializable {
private Map<String, String> streamLoadProp;
private static final long cacheExpireTimeout = 4 * 60;
private LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
+ private String fileType;
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
this.db = db;
@@ -114,6 +115,11 @@ public class DorisStreamLoad implements Serializable {
cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
.build(new BackendCacheLoader(settings));
+ fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format");
+ if (fileType.equals("csv")){
+ FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator");
+ LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter");
+ }
}
public String getLoadUrlStr() {
@@ -156,8 +162,10 @@ public class DorisStreamLoad implements Serializable {
conn.addRequestProperty(k, v);
});
}
- conn.addRequestProperty("format", "json");
- conn.addRequestProperty("strip_outer_array", "true");
+ if (fileType.equals("json")){
+ conn.addRequestProperty("format", "json");
+ conn.addRequestProperty("strip_outer_array", "true");
+ }
return conn;
}
@@ -200,24 +208,30 @@ public class DorisStreamLoad implements Serializable {
public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException {
- List<Map<Object, Object>> dataList = new ArrayList<>();
- try {
- for (List<Object> row : rows) {
- Map<Object, Object> dataMap = new HashMap<>();
- if (dfColumns.length == row.size()) {
- for (int i = 0; i < dfColumns.length; i++) {
- dataMap.put(dfColumns[i], row.get(i));
+ if (fileType.equals("csv")) {
+ load(listToString(rows));
+ } else if(fileType.equals("json")) {
+ List<Map<Object, Object>> dataList = new ArrayList<>();
+ try {
+ for (List<Object> row : rows) {
+ Map<Object, Object> dataMap = new HashMap<>();
+ if (dfColumns.length == row.size()) {
+ for (int i = 0; i < dfColumns.length; i++) {
+ dataMap.put(dfColumns[i], row.get(i));
+ }
}
+ dataList.add(dataMap);
}
- dataList.add(dataMap);
+ } catch (Exception e) {
+ throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
}
- } catch (Exception e) {
- throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
- }
- // splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception
- List<String> serializedList = ListUtils.getSerializedList(dataList);
- for (String serializedRows : serializedList) {
- load(serializedRows);
+ // splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception
+ List<String> serializedList = ListUtils.getSerializedList(dataList);
+ for (String serializedRows : serializedList) {
+ load(serializedRows);
+ }
+ } else {
+ throw new StreamLoadException("Not supoort the file format in stream load.");
}
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 2922d63..0399acf 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -83,7 +83,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
line.add(field.asInstanceOf[AnyRef])
}
rowsBuffer.add(line)
- if (rowsBuffer.size > maxRowCount) {
+ if (rowsBuffer.size > maxRowCount - 1 ) {
flush
}
})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org