You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/01/28 02:17:39 UTC

[doris-spark-connector] branch master updated: [improve]Return specific error information after optimizing the exception (#64)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 97d2222  [improve]Return specific error information after optimizing the exception (#64)
97d2222 is described below

commit 97d2222153f57db4b5616b36c18a7711e5ebcbe4
Author: caoliang-web <71...@users.noreply.github.com>
AuthorDate: Sat Jan 28 10:17:34 2023 +0800

    [improve]Return specific error information after optimizing the exception (#64)
    
    * Return specific error information after optimizing the exception
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 49 +++++++++++-----------
 .../doris/spark/sql/TestSparkConnector.scala       |  4 +-
 2 files changed, 27 insertions(+), 26 deletions(-)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 25ed7b1..351ef23 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * DorisStreamLoad
  **/
-public class DorisStreamLoad implements Serializable{
+public class DorisStreamLoad implements Serializable {
     public static final String FIELD_DELIMITER = "\t";
     public static final String LINE_DELIMITER = "\n";
     public static final String NULL_VALUE = "\\N";
@@ -68,7 +68,7 @@ public class DorisStreamLoad implements Serializable{
     private String columns;
     private String[] dfColumns;
     private String maxFilterRatio;
-    private Map<String,String> streamLoadProp;
+    private Map<String, String> streamLoadProp;
     private static final long cacheExpireTimeout = 4 * 60;
     private LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
 
@@ -91,7 +91,7 @@ public class DorisStreamLoad implements Serializable{
         this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
 
         this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
-        this.streamLoadProp=getStreamLoadProp(settings);
+        this.streamLoadProp = getStreamLoadProp(settings);
         cache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
                 .build(new BackendCacheLoader(settings));
@@ -110,7 +110,7 @@ public class DorisStreamLoad implements Serializable{
         this.dfColumns = dfColumns;
 
         this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
-        this.streamLoadProp=getStreamLoadProp(settings);
+        this.streamLoadProp = getStreamLoadProp(settings);
         cache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
                 .build(new BackendCacheLoader(settings));
@@ -142,15 +142,15 @@ public class DorisStreamLoad implements Serializable{
 
         conn.setDoOutput(true);
         conn.setDoInput(true);
-        if(streamLoadProp != null ){
-            streamLoadProp.forEach((k,v) -> {
-                if(streamLoadProp.containsKey("format")){
+        if (streamLoadProp != null) {
+            streamLoadProp.forEach((k, v) -> {
+                if (streamLoadProp.containsKey("format")) {
                     return;
                 }
-                if(streamLoadProp.containsKey("strip_outer_array")) {
+                if (streamLoadProp.containsKey("strip_outer_array")) {
                     return;
                 }
-                if(streamLoadProp.containsKey("read_json_by_line")){
+                if (streamLoadProp.containsKey("read_json_by_line")) {
                     return;
                 }
                 conn.addRequestProperty(k, v);
@@ -171,6 +171,7 @@ public class DorisStreamLoad implements Serializable{
             this.respMsg = respMsg;
             this.respContent = respContent;
         }
+
         @Override
         public String toString() {
             StringBuilder sb = new StringBuilder();
@@ -199,10 +200,10 @@ public class DorisStreamLoad implements Serializable{
 
 
     public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException {
-        List<Map<Object,Object>> dataList = new ArrayList<>();
+        List<Map<Object, Object>> dataList = new ArrayList<>();
         try {
             for (List<Object> row : rows) {
-                Map<Object,Object> dataMap = new HashMap<>();
+                Map<Object, Object> dataMap = new HashMap<>();
                 if (dfColumns.length == row.size()) {
                     for (int i = 0; i < dfColumns.length; i++) {
                         dataMap.put(dfColumns[i], row.get(i));
@@ -222,18 +223,18 @@ public class DorisStreamLoad implements Serializable{
 
     public void load(String value) throws StreamLoadException {
         LoadResponse loadResponse = loadBatch(value);
-        if(loadResponse.status != 200){
-            LOG.info("Streamload Response HTTP Status Error:{}",loadResponse);
+        if (loadResponse.status != 200) {
+            LOG.info("Streamload Response HTTP Status Error:{}", loadResponse);
             throw new StreamLoadException("stream load error: " + loadResponse.respContent);
-        }else{
-            LOG.info("Streamload Response:{}",loadResponse);
+        } else {
             ObjectMapper obj = new ObjectMapper();
             try {
                 RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
-                if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){
-                    LOG.info("Streamload Response RES STATUS Error:{}", loadResponse);
-                    throw new StreamLoadException("stream load error: " + respContent.getMessage());
+                if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+                    LOG.error("Streamload Response RES STATUS Error:{}", loadResponse);
+                    throw new StreamLoadException("stream load error: " + loadResponse);
                 }
+                LOG.info("Streamload Response:{}", loadResponse);
             } catch (IOException e) {
                 throw new StreamLoadException(e);
             }
@@ -277,7 +278,7 @@ public class DorisStreamLoad implements Serializable{
 
         } catch (Exception e) {
             e.printStackTrace();
-            String err = "http request exception,load url : "+loadUrlStr+",failed to execute spark streamload with label: " + label;
+            String err = "http request exception,load url : " + loadUrlStr + ",failed to execute spark streamload with label: " + label;
             LOG.warn(err, e);
             return new LoadResponse(status, e.getMessage(), err);
         } finally {
@@ -290,13 +291,13 @@ public class DorisStreamLoad implements Serializable{
         }
     }
 
-    public Map<String,String> getStreamLoadProp(SparkSettings sparkSettings){
-        Map<String,String> streamLoadPropMap = new HashMap<>();
+    public Map<String, String> getStreamLoadProp(SparkSettings sparkSettings) {
+        Map<String, String> streamLoadPropMap = new HashMap<>();
         Properties properties = sparkSettings.asProperties();
         for (String key : properties.stringPropertyNames()) {
-            if( key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)){
+            if (key.contains(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)) {
                 String subKey = key.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length());
-                streamLoadPropMap.put(subKey,properties.getProperty(key));
+                streamLoadPropMap.put(subKey, properties.getProperty(key));
             }
         }
         return streamLoadPropMap;
@@ -310,7 +311,7 @@ public class DorisStreamLoad implements Serializable{
             BackendV2.BackendRowV2 backend = backends.get(0);
             return backend.getIp() + ":" + backend.getHttpPort();
         } catch (ExecutionException e) {
-            throw new RuntimeException("get backends info fail",e);
+            throw new RuntimeException("get backends info fail", e);
         }
     }
 
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
index 09faf39..54771df 100644
--- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -58,7 +58,7 @@ class TestSparkConnector {
       ("zhangsan", "m"),
       ("lisi", "f"),
       ("wangwu", "m")
-    ))
+    )).toDF("name","gender")
     df.write
       .format("doris")
       .option("doris.fenodes", dorisFeNodes)
@@ -66,7 +66,7 @@ class TestSparkConnector {
       .option("user", dorisUser)
       .option("password", dorisPwd)
       //specify your field
-      .option("doris.write.field", "name,gender")
+      .option("doris.write.fields", "name,gender")
       .option("sink.batch.size",2)
       .option("sink.max-retries",2)
       .save()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org