You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/01/28 02:17:39 UTC
[doris-spark-connector] branch master updated: [improve]Return specific error information after optimizing the exception (#64)
This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 97d2222 [improve]Return specific error information after optimizing the exception (#64)
97d2222 is described below
commit 97d2222153f57db4b5616b36c18a7711e5ebcbe4
Author: caoliang-web <71...@users.noreply.github.com>
AuthorDate: Sat Jan 28 10:17:34 2023 +0800
[improve]Return specific error information after optimizing the exception (#64)
* Return specific error information after optimizing the exception
---
.../org/apache/doris/spark/DorisStreamLoad.java | 49 +++++++++++-----------
.../doris/spark/sql/TestSparkConnector.scala | 4 +-
2 files changed, 27 insertions(+), 26 deletions(-)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 25ed7b1..351ef23 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit;
/**
* DorisStreamLoad
**/
-public class DorisStreamLoad implements Serializable{
+public class DorisStreamLoad implements Serializable {
public static final String FIELD_DELIMITER = "\t";
public static final String LINE_DELIMITER = "\n";
public static final String NULL_VALUE = "\\N";
@@ -68,7 +68,7 @@ public class DorisStreamLoad implements Serializable{
private String columns;
private String[] dfColumns;
private String maxFilterRatio;
- private Map<String,String> streamLoadProp;
+ private Map<String, String> streamLoadProp;
private static final long cacheExpireTimeout = 4 * 60;
private LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
@@ -91,7 +91,7 @@ public class DorisStreamLoad implements Serializable{
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
- this.streamLoadProp=getStreamLoadProp(settings);
+ this.streamLoadProp = getStreamLoadProp(settings);
cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
.build(new BackendCacheLoader(settings));
@@ -110,7 +110,7 @@ public class DorisStreamLoad implements Serializable{
this.dfColumns = dfColumns;
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
- this.streamLoadProp=getStreamLoadProp(settings);
+ this.streamLoadProp = getStreamLoadProp(settings);
cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
.build(new BackendCacheLoader(settings));
@@ -142,15 +142,15 @@ public class DorisStreamLoad implements Serializable{
conn.setDoOutput(true);
conn.setDoInput(true);
- if(streamLoadProp != null ){
- streamLoadProp.forEach((k,v) -> {
- if(streamLoadProp.containsKey("format")){
+ if (streamLoadProp != null) {
+ streamLoadProp.forEach((k, v) -> {
+ if (streamLoadProp.containsKey("format")) {
return;
}
- if(streamLoadProp.containsKey("strip_outer_array")) {
+ if (streamLoadProp.containsKey("strip_outer_array")) {
return;
}
- if(streamLoadProp.containsKey("read_json_by_line")){
+ if (streamLoadProp.containsKey("read_json_by_line")) {
return;
}
conn.addRequestProperty(k, v);
@@ -171,6 +171,7 @@ public class DorisStreamLoad implements Serializable{
this.respMsg = respMsg;
this.respContent = respContent;
}
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -199,10 +200,10 @@ public class DorisStreamLoad implements Serializable{
public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException {
- List<Map<Object,Object>> dataList = new ArrayList<>();
+ List<Map<Object, Object>> dataList = new ArrayList<>();
try {
for (List<Object> row : rows) {
- Map<Object,Object> dataMap = new HashMap<>();
+ Map<Object, Object> dataMap = new HashMap<>();
if (dfColumns.length == row.size()) {
for (int i = 0; i < dfColumns.length; i++) {
dataMap.put(dfColumns[i], row.get(i));
@@ -222,18 +223,18 @@ public class DorisStreamLoad implements Serializable{
public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
- if(loadResponse.status != 200){
- LOG.info("Streamload Response HTTP Status Error:{}",loadResponse);
+ if (loadResponse.status != 200) {
+ LOG.info("Streamload Response HTTP Status Error:{}", loadResponse);
throw new StreamLoadException("stream load error: " + loadResponse.respContent);
- }else{
- LOG.info("Streamload Response:{}",loadResponse);
+ } else {
ObjectMapper obj = new ObjectMapper();
try {
RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
- if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){
- LOG.info("Streamload Response RES STATUS Error:{}", loadResponse);
- throw new StreamLoadException("stream load error: " + respContent.getMessage());
+ if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+ LOG.error("Streamload Response RES STATUS Error:{}", loadResponse);
+ throw new StreamLoadException("stream load error: " + loadResponse);
}
+ LOG.info("Streamload Response:{}", loadResponse);
} catch (IOException e) {
throw new StreamLoadException(e);
}
@@ -277,7 +278,7 @@ public class DorisStreamLoad implements Serializable{
} catch (Exception e) {
e.printStackTrace();
- String err = "http request exception,load url : "+loadUrlStr+",failed to execute spark streamload with label: " + label;
+ String err = "http request exception,load url : " + loadUrlStr + ",failed to execute spark streamload with label: " + label;
LOG.warn(err, e);
return new LoadResponse(status, e.getMessage(), err);
} finally {
@@ -290,13 +291,13 @@ public class DorisStreamLoad implements Serializable{
}
}
- public Map<String,String> getStreamLoadProp(SparkSettings sparkSettings){
- Map<String,String> streamLoadPropMap = new HashMap<>();
+ public Map<String, String> getStreamLoadProp(SparkSettings sparkSettings) {
+ Map<String, String> streamLoadPropMap = new HashMap<>();
Properties properties = sparkSettings.asProperties();
for (String key : properties.stringPropertyNames()) {
- if( key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)){
+ if (key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)) {
String subKey = key.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length());
- streamLoadPropMap.put(subKey,properties.getProperty(key));
+ streamLoadPropMap.put(subKey, properties.getProperty(key));
}
}
return streamLoadPropMap;
@@ -310,7 +311,7 @@ public class DorisStreamLoad implements Serializable{
BackendV2.BackendRowV2 backend = backends.get(0);
return backend.getIp() + ":" + backend.getHttpPort();
} catch (ExecutionException e) {
- throw new RuntimeException("get backends info fail",e);
+ throw new RuntimeException("get backends info fail", e);
}
}
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
index 09faf39..54771df 100644
--- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -58,7 +58,7 @@ class TestSparkConnector {
("zhangsan", "m"),
("lisi", "f"),
("wangwu", "m")
- ))
+ )).toDF("name","gender")
df.write
.format("doris")
.option("doris.fenodes", dorisFeNodes)
@@ -66,7 +66,7 @@ class TestSparkConnector {
.option("user", dorisUser)
.option("password", dorisPwd)
//specify your field
- .option("doris.write.field", "name,gender")
+ .option("doris.write.fields", "name,gender")
.option("sink.batch.size",2)
.option("sink.max-retries",2)
.save()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org