You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/01 09:37:31 UTC
[incubator-inlong] branch master updated: [INLONG-4453][Sort-Standalone] Fix bug that report wrong audit when send to kafka failed (#4454)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ab27830e8 [INLONG-4453][Sort-Standalone] Fix bug that report wrong audit when send to kafka failed (#4454)
ab27830e8 is described below
commit ab27830e822b65696655df065cc7a38f08650073
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Jun 1 17:37:26 2022 +0800
[INLONG-4453][Sort-Standalone] Fix bug that report wrong audit when send to kafka failed (#4454)
---
.../standalone/sink/kafka/KafkaFederationSink.java | 13 +------------
.../sink/kafka/KafkaFederationWorker.java | 12 ++----------
.../sink/kafka/KafkaProducerCluster.java | 5 +++--
.../standalone/source/sortsdk/FetchCallback.java | 22 ++++++++++------------
.../standalone/source/sortsdk/SortSdkSource.java | 16 ++++++++--------
5 files changed, 24 insertions(+), 44 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
index b163ef198..ac10fa9ac 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
@@ -21,35 +21,24 @@ import org.apache.flume.Context;
import org.apache.flume.Sink;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
public class KafkaFederationSink extends AbstractSink implements Configurable {
private static final Logger LOG = InlongLoggerFactory.getLogger(KafkaFederationSink.class);
private Context parentContext;
private KafkaFederationSinkContext context;
private List<KafkaFederationWorker> workers = new ArrayList<>();
- private Map<String, String> dimensions;
/** init and start workers */
@Override
public void start() {
String sinkName = this.getName();
- if (getChannel() == null) {
- LOG.error("channel is null");
- }
- this.context = new KafkaFederationSinkContext(getName(), parentContext, getChannel());
+ this.context = new KafkaFederationSinkContext(sinkName, parentContext, getChannel());
this.context.start();
- this.dimensions = new HashMap<>();
- this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.context.getClusterId());
- this.dimensions.put(SortMetricItem.KEY_TASK_NAME, this.context.getTaskName());
- this.dimensions.put(SortMetricItem.KEY_SINK_ID, this.context.getSinkName());
// create worker
for (int i = 0; i < context.getMaxThreads(); i++) {
KafkaFederationWorker worker = new KafkaFederationWorker(sinkName, i, context);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
index 97f26c026..f3d7127d2 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
@@ -28,7 +28,6 @@ import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.util.Map;
@@ -89,10 +88,6 @@ public class KafkaFederationWorker extends Thread {
Transaction tx = null;
try {
Channel channel = context.getChannel();
- if (channel == null) {
- LOG.error("in kafka worker, channel is null ");
- break;
- }
tx = channel.getTransaction();
tx.begin();
Event rowEvent = channel.take();
@@ -140,11 +135,8 @@ public class KafkaFederationWorker extends Thread {
String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
String topic = this.context.getTopic(uid);
- if (!StringUtils.isBlank(topic)) {
- headers.put(Constants.TOPIC, topic);
- return topic;
- }
- return "-";
+ headers.put(Constants.TOPIC, topic);
+ return topic;
}
/** sleepOneInterval */
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 52c8bdb4c..94d468760 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -132,6 +132,7 @@ public class KafkaProducerCluster implements LifecycleAware {
tx.commit();
profileEvent.ack();
tx.close();
+ sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime);
return true;
}
try {
@@ -145,7 +146,7 @@ public class KafkaProducerCluster implements LifecycleAware {
LOG.error(String.format("send failed, topic is %s, partition is %s",
metadata.topic(), metadata.partition()), ex);
tx.rollback();
- sinkContext.addSendResultMetric(profileEvent, topic, true, sendTime);
+ sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime);
}
tx.close();
});
@@ -154,7 +155,7 @@ public class KafkaProducerCluster implements LifecycleAware {
tx.rollback();
tx.close();
LOG.error(e.getMessage(), e);
- sinkContext.addSendResultMetric(profileEvent, topic, true, sendTime);
+ sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime);
return false;
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
index 2fd854efa..458a22f4e 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
@@ -53,7 +53,7 @@ public class FetchCallback implements ReadCallback {
private static final Logger LOG = LoggerFactory.getLogger(FetchCallback.class);
// SortId of fetch message.
- private final String sortId;
+ private final String sortTaskName;
// ChannelProcessor that put message in specific channel.
private final ChannelProcessor channelProcessor;
@@ -68,15 +68,15 @@ public class FetchCallback implements ReadCallback {
* Private constructor of {@link FetchCallback}.
* <p> The construction of FetchCallback should be initiated by {@link FetchCallback.Factory}.</p>
*
- * @param sortId SortId of fetch message.
+ * @param sortTaskName SortId of fetch message.
* @param channelProcessor ChannelProcessor that message put in.
* @param context The context to report fetch results.
*/
private FetchCallback(
- final String sortId,
+ final String sortTaskName,
final ChannelProcessor channelProcessor,
final SortSdkSourceContext context) {
- this.sortId = sortId;
+ this.sortTaskName = sortTaskName;
this.channelProcessor = channelProcessor;
this.context = context;
}
@@ -105,23 +105,21 @@ public class FetchCallback implements ReadCallback {
Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
CacheMessageRecord cacheRecord = new CacheMessageRecord(messageRecord, client);
for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
- //TODO fix here
final SubscribeFetchResult result = SubscribeFetchResult.Factory
- .create(sortId, messageRecord.getMsgKey(), messageRecord.getOffset(),
+ .create(sortTaskName, messageRecord.getMsgKey(), messageRecord.getOffset(),
inLongMessage.getParams(), messageRecord.getRecTime(),
inLongMessage.getBody());
final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(),
cacheRecord);
channelProcessor.processEvent(profileEvent);
- context.reportToMetric(profileEvent, sortId, "-", SortSdkSourceContext.FetchResult.SUCCESS);
+ context.reportToMetric(profileEvent, sortTaskName, "-", SortSdkSourceContext.FetchResult.SUCCESS);
}
-
-// client.ack(messageRecord.getMsgKey(), messageRecord.getOffset());
} catch (NullPointerException npe) {
- LOG.error("Got a null pointer exception for sortId " + sortId, npe);
- context.reportToMetric(null, sortId, "-", SortSdkSourceContext.FetchResult.FAILURE);
+ LOG.error("Got a null pointer exception for sort task " + sortTaskName, npe);
+ context.reportToMetric(null, sortTaskName, "-", SortSdkSourceContext.FetchResult.FAILURE);
} catch (Exception e) {
- LOG.error("Ack failed for sortId " + sortId, e);
+ LOG.error("Got exception of sort task " + sortTaskName, e);
+ context.reportToMetric(null, sortTaskName, "-", SortSdkSourceContext.FetchResult.FAILURE);
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 42b8b34ff..9e15b71ef 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -151,23 +151,23 @@ public final class SortSdkSource extends AbstractSource
}
/**
- * Create one {@link SortClient} with specific sort id.
+ * Create one {@link SortClient} with specific sort task.
*
* <p>
* In current version, the {@link FetchCallback} will hold the client to ACK. For more details see
* {@link FetchCallback#onFinished}
* </p>
*
- * @param sortId Sort in of new client.
+ * @param sortTaskName Sort in of new client.
* @return New sort client.
*/
- private SortClient newClient(final String sortId) {
- LOG.info("Start to new sort client for id: {}", sortId);
+ private SortClient newClient(final String sortTaskName) {
+ LOG.info("Start to new sort client for task: {}", sortTaskName);
try {
- final SortClientConfig clientConfig = new SortClientConfig(sortId, this.sortClusterName,
+ final SortClientConfig clientConfig = new SortClientConfig(sortTaskName, this.sortClusterName,
new DefaultTopicChangeListener(),
SortSdkSource.defaultStrategy, InetAddress.getLocalHost().getHostAddress());
- final FetchCallback callback = FetchCallback.Factory.create(sortId, getChannelProcessor(), context);
+ final FetchCallback callback = FetchCallback.Factory.create(sortTaskName, getChannelProcessor(), context);
clientConfig.setCallback(callback);
// create SortClient
@@ -211,9 +211,9 @@ public final class SortSdkSource extends AbstractSource
callback.setClient(client);
return client;
} catch (UnknownHostException ex) {
- LOG.error("Got one UnknownHostException when init client of id:{}", sortId, ex);
+ LOG.error("Got one UnknownHostException when init client of id:{}", sortTaskName, ex);
} catch (Throwable th) {
- LOG.error("Got one throwable when init client of id:{}", sortId, th);
+ LOG.error("Got one throwable when init client of id:{}", sortTaskName, th);
}
return null;
}