You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/12/22 09:59:19 UTC

[doris-spark-connector] branch master updated: [fix] cache loader cannot be serialized (#63)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c727e36  [fix] cache loader cannot be serialized (#63)
c727e36 is described below

commit c727e36de804570da6ba5952e77e7160d000a2bf
Author: gnehil <ad...@gmail.com>
AuthorDate: Thu Dec 22 17:59:14 2022 +0800

    [fix] cache loader cannot be serialized (#63)
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 33 ++++++++++++++--------
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 8cebe68..25ed7b1 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -94,12 +94,7 @@ public class DorisStreamLoad implements Serializable{
         this.streamLoadProp=getStreamLoadProp(settings);
         cache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
-                .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() {
-                    @Override
-                    public List<BackendV2.BackendRowV2> load(String key) throws IOException, DorisException {
-                        return RestService.getBackendRows(settings, LOG);
-                    }
-                });
+                .build(new BackendCacheLoader(settings));
     }
 
     public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
@@ -118,12 +113,7 @@ public class DorisStreamLoad implements Serializable{
         this.streamLoadProp=getStreamLoadProp(settings);
         cache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
-                .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() {
-                    @Override
-                    public List<BackendV2.BackendRowV2> load(String key) throws IOException, DorisException {
-                        return RestService.getBackendRows(settings, LOG);
-                    }
-                });
+                .build(new BackendCacheLoader(settings));
     }
 
     public String getLoadUrlStr() {
@@ -323,4 +313,23 @@ public class DorisStreamLoad implements Serializable{
             throw new RuntimeException("get backends info fail",e);
         }
     }
+
+    /**
+     * serializable be cache loader
+     */
+    private static class BackendCacheLoader extends CacheLoader<String, List<BackendV2.BackendRowV2>> implements Serializable {
+
+        private final SparkSettings settings;
+
+        public BackendCacheLoader(SparkSettings settings) {
+            this.settings = settings;
+        }
+
+        @Override
+        public List<BackendV2.BackendRowV2> load(String key) throws Exception {
+            return RestService.getBackendRows(settings, LOG);
+        }
+
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org