You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 08:02:12 UTC
[inlong] 07/07: [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 3db90d470d53118d8c8b9f7acd3c479b54f63a27
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Thu Jan 5 14:58:31 2023 +0800
[INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)
---
.../apache/inlong/sdk/sort/api/ClientContext.java | 2 +-
.../fetcher/pulsar/PulsarSingleTopicFetcher.java | 138 +++++++++++----------
2 files changed, 71 insertions(+), 69 deletions(-)
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
index b22d7dbf4..fb600cfcc 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
@@ -144,7 +144,7 @@ public abstract class ClientContext implements Cleanable {
private SortSdkMetricItem getMetricItem(InLongTopic topic, int partitionId) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId);
- if (topic != null || config.isTopicStaticsEnabled()) {
+ if (topic != null && config.isTopicStaticsEnabled()) {
dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, topic.getInLongCluster().getClusterId());
dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic());
}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 4c9b41a9b..f8d39c361 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -111,8 +111,8 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
consumer.acknowledgeAsync(messageId)
.thenAccept(consumer -> ackSucc(msgOffset))
.exceptionally(exception -> {
- LOGGER.error("ack fail:{} {},error:{}",
- topic, msgOffset, exception.getMessage(), exception);
+ LOGGER.error("ack fail:{} {}",
+ topic, msgOffset, exception);
context.addAckFail(topic, -1);
return null;
});
@@ -162,9 +162,10 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
String threadName = String.format("sort_sdk_pulsar_single_topic_fetch_thread_%s_%s_%d",
this.topic.getInLongCluster().getClusterId(), topic.getTopic(), this.hashCode());
this.fetchThread = new Thread(new PulsarSingleTopicFetcher.Fetcher(), threadName);
+ this.fetchThread.setDaemon(true);
this.fetchThread.start();
} catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
+ LOGGER.error("fail to create consumer", e);
return false;
}
return true;
@@ -203,9 +204,6 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
if (consumer != null) {
consumer.close();
}
- if (fetchThread != null) {
- fetchThread.interrupt();
- }
} catch (PulsarClientException e) {
LOGGER.warn(e.getMessage(), e);
}
@@ -239,7 +237,7 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
} catch (Exception e) {
context.addCallBackFail(topic, -1, messageRecords.size(),
System.currentTimeMillis() - start);
- LOGGER.error("failed to callback {}", e.getMessage(), e);
+ LOGGER.error("failed to callback", e);
}
}
@@ -251,78 +249,82 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
public void run() {
boolean hasPermit;
while (true) {
- hasPermit = false;
- long fetchTimeCost = -1;
try {
- if (context.getConfig().isStopConsume() || stopConsume) {
- TimeUnit.MILLISECONDS.sleep(50);
- continue;
- }
+ hasPermit = false;
+ long fetchTimeCost = -1;
+ try {
+ if (context.getConfig().isStopConsume() || stopConsume) {
+ TimeUnit.MILLISECONDS.sleep(50);
+ continue;
+ }
- if (sleepTime > 0) {
- TimeUnit.MILLISECONDS.sleep(sleepTime);
- }
+ if (sleepTime > 0) {
+ TimeUnit.MILLISECONDS.sleep(sleepTime);
+ }
- context.acquireRequestPermit();
- hasPermit = true;
- context.addConsumeTime(topic, -1);
+ context.acquireRequestPermit();
+ hasPermit = true;
+ context.addConsumeTime(topic, -1);
- long startFetchTime = System.currentTimeMillis();
- Messages<byte[]> messages = consumer.batchReceive();
- fetchTimeCost = System.currentTimeMillis() - startFetchTime;
- if (null != messages && messages.size() != 0) {
- for (Message<byte[]> msg : messages) {
- // if need seek
- if (msg.getPublishTime() < seeker.getSeekTime()) {
- seeker.seek();
- break;
- }
+ long startFetchTime = System.currentTimeMillis();
+ Messages<byte[]> messages = consumer.batchReceive();
+ fetchTimeCost = System.currentTimeMillis() - startFetchTime;
+ if (null != messages && messages.size() != 0) {
+ for (Message<byte[]> msg : messages) {
+ // if need seek
+ if (msg.getPublishTime() < seeker.getSeekTime()) {
+ seeker.seek();
+ break;
+ }
- String offsetKey = getOffset(msg.getMessageId());
- offsetCache.put(offsetKey, msg.getMessageId());
+ String offsetKey = getOffset(msg.getMessageId());
+ offsetCache.put(offsetKey, msg.getMessageId());
- // deserialize
- List<InLongMessage> inLongMessages = deserializer
- .deserialize(context, topic, msg.getProperties(), msg.getData());
- context.addConsumeSuccess(topic, -1, inLongMessages.size(), msg.getData().length,
- fetchTimeCost);
- int originSize = inLongMessages.size();
- // intercept
- inLongMessages = interceptor.intercept(inLongMessages);
- if (inLongMessages.isEmpty()) {
- ack(offsetKey);
- continue;
- }
- int filterSize = originSize - inLongMessages.size();
- context.addConsumeFilter(topic, -1, filterSize);
+ // deserialize
+ List<InLongMessage> inLongMessages = deserializer
+ .deserialize(context, topic, msg.getProperties(), msg.getData());
+ context.addConsumeSuccess(topic, -1, inLongMessages.size(), msg.getData().length,
+ fetchTimeCost);
+ int originSize = inLongMessages.size();
+ // intercept
+ inLongMessages = interceptor.intercept(inLongMessages);
+ if (inLongMessages.isEmpty()) {
+ ack(offsetKey);
+ continue;
+ }
+ int filterSize = originSize - inLongMessages.size();
+ context.addConsumeFilter(topic, -1, filterSize);
- List<MessageRecord> msgs = new ArrayList<>();
- msgs.add(new MessageRecord(topic.getTopicKey(),
- inLongMessages,
- offsetKey, System.currentTimeMillis()));
- handleAndCallbackMsg(msgs);
+ List<MessageRecord> msgs = new ArrayList<>();
+ msgs.add(new MessageRecord(topic.getTopicKey(),
+ inLongMessages,
+ offsetKey, System.currentTimeMillis()));
+ handleAndCallbackMsg(msgs);
+ }
+ sleepTime = 0L;
+ } else {
+ context.addConsumeEmpty(topic, -1, fetchTimeCost);
+ emptyFetchTimes++;
+ if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) {
+ sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()),
+ context.getConfig().getMaxEmptyPollSleepMs());
+ emptyFetchTimes = 0;
+ }
}
- sleepTime = 0L;
- } else {
- context.addConsumeEmpty(topic, -1, fetchTimeCost);
- emptyFetchTimes++;
- if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) {
- sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()),
- context.getConfig().getMaxEmptyPollSleepMs());
- emptyFetchTimes = 0;
+ } catch (Exception e) {
+ context.addConsumeError(topic, -1, fetchTimeCost);
+ LOGGER.error("failed to fetch msg", e);
+ } finally {
+ if (hasPermit) {
+ context.releaseRequestPermit();
}
}
- } catch (Exception e) {
- context.addConsumeError(topic, -1, fetchTimeCost);
- LOGGER.error("failed to fetch msg: {}", e.getMessage(), e);
- } finally {
- if (hasPermit) {
- context.releaseRequestPermit();
- }
- }
- if (closed) {
- break;
+ if (closed) {
+ break;
+ }
+ } catch (Throwable t) {
+ LOGGER.error("got exception while process fetching", t);
}
}
}