You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 08:02:12 UTC

[inlong] 07/07: [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 3db90d470d53118d8c8b9f7acd3c479b54f63a27
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Thu Jan 5 14:58:31 2023 +0800

    [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)
---
 .../apache/inlong/sdk/sort/api/ClientContext.java  |   2 +-
 .../fetcher/pulsar/PulsarSingleTopicFetcher.java   | 138 +++++++++++----------
 2 files changed, 71 insertions(+), 69 deletions(-)

diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
index b22d7dbf4..fb600cfcc 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
@@ -144,7 +144,7 @@ public abstract class ClientContext implements Cleanable {
     private SortSdkMetricItem getMetricItem(InLongTopic topic, int partitionId) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId);
-        if (topic != null || config.isTopicStaticsEnabled()) {
+        if (topic != null && config.isTopicStaticsEnabled()) {
             dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, topic.getInLongCluster().getClusterId());
             dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic());
         }
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 4c9b41a9b..f8d39c361 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -111,8 +111,8 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
                 consumer.acknowledgeAsync(messageId)
                         .thenAccept(consumer -> ackSucc(msgOffset))
                         .exceptionally(exception -> {
-                            LOGGER.error("ack fail:{} {},error:{}",
-                                    topic, msgOffset, exception.getMessage(), exception);
+                            LOGGER.error("ack fail:{} {}",
+                                    topic, msgOffset, exception);
                             context.addAckFail(topic, -1);
                             return null;
                         });
@@ -162,9 +162,10 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
             String threadName = String.format("sort_sdk_pulsar_single_topic_fetch_thread_%s_%s_%d",
                     this.topic.getInLongCluster().getClusterId(), topic.getTopic(), this.hashCode());
             this.fetchThread = new Thread(new PulsarSingleTopicFetcher.Fetcher(), threadName);
+            this.fetchThread.setDaemon(true);
             this.fetchThread.start();
         } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
+            LOGGER.error("fail to create consumer", e);
             return false;
         }
         return true;
@@ -203,9 +204,6 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
                 if (consumer != null) {
                     consumer.close();
                 }
-                if (fetchThread != null) {
-                    fetchThread.interrupt();
-                }
             } catch (PulsarClientException e) {
                 LOGGER.warn(e.getMessage(), e);
             }
@@ -239,7 +237,7 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
             } catch (Exception e) {
                 context.addCallBackFail(topic, -1, messageRecords.size(),
                         System.currentTimeMillis() - start);
-                LOGGER.error("failed to callback {}", e.getMessage(), e);
+                LOGGER.error("failed to callback", e);
             }
         }
 
@@ -251,78 +249,82 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
         public void run() {
             boolean hasPermit;
             while (true) {
-                hasPermit = false;
-                long fetchTimeCost = -1;
                 try {
-                    if (context.getConfig().isStopConsume() || stopConsume) {
-                        TimeUnit.MILLISECONDS.sleep(50);
-                        continue;
-                    }
+                    hasPermit = false;
+                    long fetchTimeCost = -1;
+                    try {
+                        if (context.getConfig().isStopConsume() || stopConsume) {
+                            TimeUnit.MILLISECONDS.sleep(50);
+                            continue;
+                        }
 
-                    if (sleepTime > 0) {
-                        TimeUnit.MILLISECONDS.sleep(sleepTime);
-                    }
+                        if (sleepTime > 0) {
+                            TimeUnit.MILLISECONDS.sleep(sleepTime);
+                        }
 
-                    context.acquireRequestPermit();
-                    hasPermit = true;
-                    context.addConsumeTime(topic, -1);
+                        context.acquireRequestPermit();
+                        hasPermit = true;
+                        context.addConsumeTime(topic, -1);
 
-                    long startFetchTime = System.currentTimeMillis();
-                    Messages<byte[]> messages = consumer.batchReceive();
-                    fetchTimeCost = System.currentTimeMillis() - startFetchTime;
-                    if (null != messages && messages.size() != 0) {
-                        for (Message<byte[]> msg : messages) {
-                            // if need seek
-                            if (msg.getPublishTime() < seeker.getSeekTime()) {
-                                seeker.seek();
-                                break;
-                            }
+                        long startFetchTime = System.currentTimeMillis();
+                        Messages<byte[]> messages = consumer.batchReceive();
+                        fetchTimeCost = System.currentTimeMillis() - startFetchTime;
+                        if (null != messages && messages.size() != 0) {
+                            for (Message<byte[]> msg : messages) {
+                                // if need seek
+                                if (msg.getPublishTime() < seeker.getSeekTime()) {
+                                    seeker.seek();
+                                    break;
+                                }
 
-                            String offsetKey = getOffset(msg.getMessageId());
-                            offsetCache.put(offsetKey, msg.getMessageId());
+                                String offsetKey = getOffset(msg.getMessageId());
+                                offsetCache.put(offsetKey, msg.getMessageId());
 
-                            // deserialize
-                            List<InLongMessage> inLongMessages = deserializer
-                                    .deserialize(context, topic, msg.getProperties(), msg.getData());
-                            context.addConsumeSuccess(topic, -1, inLongMessages.size(), msg.getData().length,
-                                    fetchTimeCost);
-                            int originSize = inLongMessages.size();
-                            // intercept
-                            inLongMessages = interceptor.intercept(inLongMessages);
-                            if (inLongMessages.isEmpty()) {
-                                ack(offsetKey);
-                                continue;
-                            }
-                            int filterSize = originSize - inLongMessages.size();
-                            context.addConsumeFilter(topic, -1, filterSize);
+                                // deserialize
+                                List<InLongMessage> inLongMessages = deserializer
+                                        .deserialize(context, topic, msg.getProperties(), msg.getData());
+                                context.addConsumeSuccess(topic, -1, inLongMessages.size(), msg.getData().length,
+                                        fetchTimeCost);
+                                int originSize = inLongMessages.size();
+                                // intercept
+                                inLongMessages = interceptor.intercept(inLongMessages);
+                                if (inLongMessages.isEmpty()) {
+                                    ack(offsetKey);
+                                    continue;
+                                }
+                                int filterSize = originSize - inLongMessages.size();
+                                context.addConsumeFilter(topic, -1, filterSize);
 
-                            List<MessageRecord> msgs = new ArrayList<>();
-                            msgs.add(new MessageRecord(topic.getTopicKey(),
-                                    inLongMessages,
-                                    offsetKey, System.currentTimeMillis()));
-                            handleAndCallbackMsg(msgs);
+                                List<MessageRecord> msgs = new ArrayList<>();
+                                msgs.add(new MessageRecord(topic.getTopicKey(),
+                                        inLongMessages,
+                                        offsetKey, System.currentTimeMillis()));
+                                handleAndCallbackMsg(msgs);
+                            }
+                            sleepTime = 0L;
+                        } else {
+                            context.addConsumeEmpty(topic, -1, fetchTimeCost);
+                            emptyFetchTimes++;
+                            if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) {
+                                sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()),
+                                        context.getConfig().getMaxEmptyPollSleepMs());
+                                emptyFetchTimes = 0;
+                            }
                         }
-                        sleepTime = 0L;
-                    } else {
-                        context.addConsumeEmpty(topic, -1, fetchTimeCost);
-                        emptyFetchTimes++;
-                        if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) {
-                            sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()),
-                                    context.getConfig().getMaxEmptyPollSleepMs());
-                            emptyFetchTimes = 0;
+                    } catch (Exception e) {
+                        context.addConsumeError(topic, -1, fetchTimeCost);
+                        LOGGER.error("failed to fetch msg", e);
+                    } finally {
+                        if (hasPermit) {
+                            context.releaseRequestPermit();
                         }
                     }
-                } catch (Exception e) {
-                    context.addConsumeError(topic, -1, fetchTimeCost);
-                    LOGGER.error("failed to fetch msg: {}", e.getMessage(), e);
-                } finally {
-                    if (hasPermit) {
-                        context.releaseRequestPermit();
-                    }
-                }
 
-                if (closed) {
-                    break;
+                    if (closed) {
+                        break;
+                    }
+                } catch (Throwable t) {
+                    LOGGER.error("got exception while process fetching", t);
                 }
             }
         }