You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/01 09:37:31 UTC

[incubator-inlong] branch master updated: [INLONG-4453][Sort-Standalone] Fix bug that report wrong audit when send to kafka failed (#4454)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ab27830e8 [INLONG-4453][Sort-Standalone] Fix bug that report wrong audit when send to kafka failed (#4454)
ab27830e8 is described below

commit ab27830e822b65696655df065cc7a38f08650073
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Jun 1 17:37:26 2022 +0800

    [INLONG-4453][Sort-Standalone] Fix bug that report wrong audit when send to kafka failed (#4454)
---
 .../standalone/sink/kafka/KafkaFederationSink.java | 13 +------------
 .../sink/kafka/KafkaFederationWorker.java          | 12 ++----------
 .../sink/kafka/KafkaProducerCluster.java           |  5 +++--
 .../standalone/source/sortsdk/FetchCallback.java   | 22 ++++++++++------------
 .../standalone/source/sortsdk/SortSdkSource.java   | 16 ++++++++--------
 5 files changed, 24 insertions(+), 44 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
index b163ef198..ac10fa9ac 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
@@ -21,35 +21,24 @@ import org.apache.flume.Context;
 import org.apache.flume.Sink;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 public class KafkaFederationSink extends AbstractSink implements Configurable {
     private static final Logger LOG = InlongLoggerFactory.getLogger(KafkaFederationSink.class);
     private Context parentContext;
     private KafkaFederationSinkContext context;
     private List<KafkaFederationWorker> workers = new ArrayList<>();
-    private Map<String, String> dimensions;
 
     /** init and start workers */
     @Override
     public void start() {
         String sinkName = this.getName();
-        if (getChannel() == null) {
-            LOG.error("channel is null");
-        }
-        this.context = new KafkaFederationSinkContext(getName(), parentContext, getChannel());
+        this.context = new KafkaFederationSinkContext(sinkName, parentContext, getChannel());
         this.context.start();
-        this.dimensions = new HashMap<>();
-        this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.context.getClusterId());
-        this.dimensions.put(SortMetricItem.KEY_TASK_NAME, this.context.getTaskName());
-        this.dimensions.put(SortMetricItem.KEY_SINK_ID, this.context.getSinkName());
         // create worker
         for (int i = 0; i < context.getMaxThreads(); i++) {
             KafkaFederationWorker worker = new KafkaFederationWorker(sinkName, i, context);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
index 97f26c026..f3d7127d2 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
@@ -28,7 +28,6 @@ import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 
 import java.util.Map;
@@ -89,10 +88,6 @@ public class KafkaFederationWorker extends Thread {
             Transaction tx = null;
             try {
                 Channel channel = context.getChannel();
-                if (channel == null) {
-                    LOG.error("in kafka worker, channel is null ");
-                    break;
-                }
                 tx = channel.getTransaction();
                 tx.begin();
                 Event rowEvent = channel.take();
@@ -140,11 +135,8 @@ public class KafkaFederationWorker extends Thread {
         String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
         String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
         String topic = this.context.getTopic(uid);
-        if (!StringUtils.isBlank(topic)) {
-            headers.put(Constants.TOPIC, topic);
-            return topic;
-        }
-        return "-";
+        headers.put(Constants.TOPIC, topic);
+        return topic;
     }
 
     /** sleepOneInterval */
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 52c8bdb4c..94d468760 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -132,6 +132,7 @@ public class KafkaProducerCluster implements LifecycleAware {
             tx.commit();
             profileEvent.ack();
             tx.close();
+            sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime);
             return true;
         }
         try {
@@ -145,7 +146,7 @@ public class KafkaProducerCluster implements LifecycleAware {
                             LOG.error(String.format("send failed, topic is %s, partition is %s",
                                     metadata.topic(), metadata.partition()), ex);
                             tx.rollback();
-                            sinkContext.addSendResultMetric(profileEvent, topic, true, sendTime);
+                            sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime);
                         }
                         tx.close();
                     });
@@ -154,7 +155,7 @@ public class KafkaProducerCluster implements LifecycleAware {
             tx.rollback();
             tx.close();
             LOG.error(e.getMessage(), e);
-            sinkContext.addSendResultMetric(profileEvent, topic, true, sendTime);
+            sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime);
             return false;
         }
     }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
index 2fd854efa..458a22f4e 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
@@ -53,7 +53,7 @@ public class FetchCallback implements ReadCallback {
     private static final Logger LOG = LoggerFactory.getLogger(FetchCallback.class);
 
     // SortId of fetch message.
-    private final String sortId;
+    private final String sortTaskName;
 
     // ChannelProcessor that put message in specific channel.
     private final ChannelProcessor channelProcessor;
@@ -68,15 +68,15 @@ public class FetchCallback implements ReadCallback {
      * Private constructor of {@link FetchCallback}.
      * <p> The construction of FetchCallback should be initiated by {@link FetchCallback.Factory}.</p>
      *
-     * @param sortId SortId of fetch message.
+     * @param sortTaskName SortId of fetch message.
      * @param channelProcessor ChannelProcessor that message put in.
      * @param context The context to report fetch results.
      */
     private FetchCallback(
-            final String sortId,
+            final String sortTaskName,
             final ChannelProcessor channelProcessor,
             final SortSdkSourceContext context) {
-        this.sortId = sortId;
+        this.sortTaskName = sortTaskName;
         this.channelProcessor = channelProcessor;
         this.context = context;
     }
@@ -105,23 +105,21 @@ public class FetchCallback implements ReadCallback {
             Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
             CacheMessageRecord cacheRecord = new CacheMessageRecord(messageRecord, client);
             for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
-                //TODO fix here
                 final SubscribeFetchResult result = SubscribeFetchResult.Factory
-                        .create(sortId, messageRecord.getMsgKey(), messageRecord.getOffset(),
+                        .create(sortTaskName, messageRecord.getMsgKey(), messageRecord.getOffset(),
                                 inLongMessage.getParams(), messageRecord.getRecTime(),
                                 inLongMessage.getBody());
                 final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), 
                         cacheRecord);
                 channelProcessor.processEvent(profileEvent);
-                context.reportToMetric(profileEvent, sortId, "-", SortSdkSourceContext.FetchResult.SUCCESS);
+                context.reportToMetric(profileEvent, sortTaskName, "-", SortSdkSourceContext.FetchResult.SUCCESS);
             }
-
-//            client.ack(messageRecord.getMsgKey(), messageRecord.getOffset());
         } catch (NullPointerException npe) {
-            LOG.error("Got a null pointer exception for sortId " + sortId, npe);
-            context.reportToMetric(null, sortId, "-", SortSdkSourceContext.FetchResult.FAILURE);
+            LOG.error("Got a null pointer exception for sort task " + sortTaskName, npe);
+            context.reportToMetric(null, sortTaskName, "-", SortSdkSourceContext.FetchResult.FAILURE);
         } catch (Exception e) {
-            LOG.error("Ack failed for sortId " + sortId, e);
+            LOG.error("Got exception of sort task " + sortTaskName, e);
+            context.reportToMetric(null, sortTaskName, "-", SortSdkSourceContext.FetchResult.FAILURE);
         }
     }
 
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 42b8b34ff..9e15b71ef 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -151,23 +151,23 @@ public final class SortSdkSource extends AbstractSource
     }
 
     /**
-     * Create one {@link SortClient} with specific sort id.
+     * Create one {@link SortClient} with specific sort task.
      *
      * <p>
      * In current version, the {@link FetchCallback} will hold the client to ACK. For more details see
      * {@link FetchCallback#onFinished}
      * </p>
      *
-     * @param  sortId Sort in of new client.
+     * @param  sortTaskName Sort in of new client.
      * @return        New sort client.
      */
-    private SortClient newClient(final String sortId) {
-        LOG.info("Start to new sort client for id: {}", sortId);
+    private SortClient newClient(final String sortTaskName) {
+        LOG.info("Start to new sort client for task: {}", sortTaskName);
         try {
-            final SortClientConfig clientConfig = new SortClientConfig(sortId, this.sortClusterName,
+            final SortClientConfig clientConfig = new SortClientConfig(sortTaskName, this.sortClusterName,
                     new DefaultTopicChangeListener(),
                     SortSdkSource.defaultStrategy, InetAddress.getLocalHost().getHostAddress());
-            final FetchCallback callback = FetchCallback.Factory.create(sortId, getChannelProcessor(), context);
+            final FetchCallback callback = FetchCallback.Factory.create(sortTaskName, getChannelProcessor(), context);
             clientConfig.setCallback(callback);
 
             // create SortClient
@@ -211,9 +211,9 @@ public final class SortSdkSource extends AbstractSource
             callback.setClient(client);
             return client;
         } catch (UnknownHostException ex) {
-            LOG.error("Got one UnknownHostException when init client of id:{}", sortId, ex);
+            LOG.error("Got one UnknownHostException when init client of id:{}", sortTaskName, ex);
         } catch (Throwable th) {
-            LOG.error("Got one throwable when init client of id:{}", sortId, th);
+            LOG.error("Got one throwable when init client of id:{}", sortTaskName, th);
         }
         return null;
     }