You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/04/29 02:40:12 UTC

[incubator-doris] branch master updated: [refactor][routineload] Remove unused client object from routine load (#9223)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 93a41b2625 [refactor][routineload] Remove unused client object from routine load (#9223)
93a41b2625 is described below

commit 93a41b2625af74255487b3ac439ec293b9682ee0
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Fri Apr 29 10:40:07 2022 +0800

    [refactor][routineload] Remove unused client object from routine load (#9223)
---
 .../org/apache/doris/common/util/KafkaUtil.java    | 26 ----------------------
 1 file changed, 26 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index ecbb897ab5..3739ab22cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -18,14 +18,12 @@
 package org.apache.doris.common.util;
 
 import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.system.Backend;
-import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 
@@ -47,9 +45,7 @@ public class KafkaUtil {
 
     public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
             Map<String, String> convertedCustomProperties) throws UserException {
-        BackendService.Client client = null;
         TNetworkAddress address = null;
-        boolean ok = false;
         try {
             List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
             if (backendIds.isEmpty()) {
@@ -89,12 +85,6 @@ public class KafkaUtil {
             LOG.warn("failed to get partitions.", e);
             throw new LoadException(
                     "Failed to get all partitions of kafka topic: " + topic + ". error: " + e.getMessage());
-        } finally {
-            if (ok) {
-                ClientPool.backendPool.returnObject(address, client);
-            } else {
-                ClientPool.backendPool.invalidateObject(address, client);
-            }
         }
     }
 
@@ -104,10 +94,8 @@ public class KafkaUtil {
     public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, String topic,
                                                                Map<String, String> convertedCustomProperties,
                                                                List<Pair<Integer, Long>> timestampOffsets) throws LoadException {
-        BackendService.Client client = null;
         TNetworkAddress address = null;
         LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets);
-        boolean ok = false;
         try {
             List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
             if (backendIds.isEmpty()) {
@@ -159,23 +147,15 @@ public class KafkaUtil {
             LOG.warn("failed to get offsets for times.", e);
             throw new LoadException(
                     "Failed to get offsets for times of kafka topic: " + topic + ". error: " + e.getMessage());
-        } finally {
-            if (ok) {
-                ClientPool.backendPool.returnObject(address, client);
-            } else {
-                ClientPool.backendPool.invalidateObject(address, client);
-            }
         }
     }
 
     public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic,
                                                              Map<String, String> convertedCustomProperties,
                                                              List<Integer> partitionIds) throws LoadException {
-        BackendService.Client client = null;
         TNetworkAddress address = null;
         LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}",
                 partitionIds, topic, taskId, jobId);
-        boolean ok = false;
         try {
             List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
             if (backendIds.isEmpty()) {
@@ -226,12 +206,6 @@ public class KafkaUtil {
             LOG.warn("failed to get latest offsets.", e);
             throw new LoadException(
                     "Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage());
-        } finally {
-            if (ok) {
-                ClientPool.backendPool.returnObject(address, client);
-            } else {
-                ClientPool.backendPool.invalidateObject(address, client);
-            }
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org