You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/12/22 09:59:19 UTC
[doris-spark-connector] branch master updated: [fix] cache loader cannot be serialized (#63)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c727e36 [fix] cache loader cannot be serialized (#63)
c727e36 is described below
commit c727e36de804570da6ba5952e77e7160d000a2bf
Author: gnehil <ad...@gmail.com>
AuthorDate: Thu Dec 22 17:59:14 2022 +0800
[fix] cache loader cannot be serialized (#63)
---
.../org/apache/doris/spark/DorisStreamLoad.java | 33 ++++++++++++++--------
1 file changed, 21 insertions(+), 12 deletions(-)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 8cebe68..25ed7b1 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -94,12 +94,7 @@ public class DorisStreamLoad implements Serializable{
this.streamLoadProp=getStreamLoadProp(settings);
cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
- .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() {
- @Override
- public List<BackendV2.BackendRowV2> load(String key) throws IOException, DorisException {
- return RestService.getBackendRows(settings, LOG);
- }
- });
+ .build(new BackendCacheLoader(settings));
}
public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
@@ -118,12 +113,7 @@ public class DorisStreamLoad implements Serializable{
this.streamLoadProp=getStreamLoadProp(settings);
cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
- .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() {
- @Override
- public List<BackendV2.BackendRowV2> load(String key) throws IOException, DorisException {
- return RestService.getBackendRows(settings, LOG);
- }
- });
+ .build(new BackendCacheLoader(settings));
}
public String getLoadUrlStr() {
@@ -323,4 +313,23 @@ public class DorisStreamLoad implements Serializable{
throw new RuntimeException("get backends info fail",e);
}
}
+
+ /**
+ * serializable be cache loader
+ */
+ private static class BackendCacheLoader extends CacheLoader<String, List<BackendV2.BackendRowV2>> implements Serializable {
+
+ private final SparkSettings settings;
+
+ public BackendCacheLoader(SparkSettings settings) {
+ this.settings = settings;
+ }
+
+ @Override
+ public List<BackendV2.BackendRowV2> load(String key) throws Exception {
+ return RestService.getBackendRows(settings, LOG);
+ }
+
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org