You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/07 05:59:58 UTC

[inlong] branch master updated: [INLONG-5805][Sort] Fix init interceptor incorrectly (#5806)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ab0d7e9ef [INLONG-5805][Sort] Fix init interceptor incorrectly (#5806)
ab0d7e9ef is described below

commit ab0d7e9ef6308197a89e49ec2b3f49a941c71104
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Sep 7 13:59:53 2022 +0800

    [INLONG-5805][Sort] Fix init interceptor incorrectly (#5806)
---
 .../standalone/utils/FlumeConfigGenerator.java     | 25 ++++++++++++++++------
 .../rollback/TimeBasedFilterInterceptor.java       | 17 +++++++--------
 .../standalone/source/sortsdk/SortSdkSource.java   |  2 +-
 3 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index 2981d1982..ca3e1b102 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -33,8 +33,9 @@ public class FlumeConfigGenerator {
     public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type";
     public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
     public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
-    public static final String KEY_SDK_START_TIME = "sortSdk.startTime";
-    public static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime";
+    public static final String KEY_SORT_INTERCEPTOR_TYPE = "interceptor.type";
+    public static final String KEY_ROLLBACK_START_TIME = "rollback.startTime";
+    public static final String KEY_ROLLBACK_STOP_TIME = "rollback.stopTime";
 
     public static Map<String, String> generateFlumeConfiguration(SortTaskConfig taskConfig) {
         Map<String, String> flumeConf = new HashMap<>();
@@ -161,12 +162,24 @@ public class FlumeConfigGenerator {
         flumeConf.put(selectorTypeKey, "org.apache.flume.channel.ReplicatingChannelSelector");
         // valid msg time interval
         builder.setLength(0);
-        String startTimeKey = builder.append(prefix).append(KEY_SDK_START_TIME).toString();
-        Optional.ofNullable(sinkParams.get(KEY_SDK_START_TIME))
+        String interceptorKey = builder.append(prefix).append("interceptors").toString();
+        String interceptorName = name + "Interceptor";
+        flumeConf.put(interceptorKey, interceptorName);
+
+        builder.setLength(0);
+        String interceptorType = builder.append(prefix).append("interceptors.").append(interceptorName)
+                .append(".type").toString();
+        Optional.ofNullable(CommonPropertiesHolder.getString(KEY_SORT_INTERCEPTOR_TYPE))
+                .map(type -> flumeConf.put(interceptorType, type));
+        builder.setLength(0);
+        String startTimeKey = builder.append(prefix).append("interceptors.").append(interceptorName).append(".")
+                .append(KEY_ROLLBACK_START_TIME).toString();
+        Optional.ofNullable(CommonPropertiesHolder.getString(KEY_ROLLBACK_START_TIME))
                 .map(startTime -> flumeConf.put(startTimeKey, startTime));
         builder.setLength(0);
-        String stopTimeKey = builder.append(prefix).append(KEY_SDK_STOP_TIME).toString();
-        Optional.ofNullable(sinkParams.get(KEY_SDK_STOP_TIME))
+        String stopTimeKey = builder.append(prefix).append("interceptors.").append(interceptorName).append(".")
+                .append(KEY_ROLLBACK_STOP_TIME).toString();
+        Optional.ofNullable(CommonPropertiesHolder.getString(KEY_ROLLBACK_STOP_TIME))
                 .map(stopTime -> flumeConf.put(stopTimeKey, stopTime));
 
         appendCommon(flumeConf, prefix, null, name);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java
index 5894a8b87..79739794a 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java
@@ -17,12 +17,10 @@
 
 package org.apache.inlong.sort.standalone.rollback;
 
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.interceptor.Interceptor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.utils.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,15 +57,16 @@ public class TimeBasedFilterInterceptor implements Interceptor {
     @Override
     public Event intercept(Event event) {
         long logTime;
+        ProfileEvent profile;
         if (event instanceof ProfileEvent) {
-            ProfileEvent profile = (ProfileEvent) event;
+            profile = (ProfileEvent) event;
             logTime = profile.getRawLogTime();
         } else {
-            logTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
-                    System.currentTimeMillis());
+            return event;
         }
 
         if (logTime > stopTime || logTime < startTime) {
+            profile.ack();
             return null;
         }
         return event;
@@ -93,9 +92,9 @@ public class TimeBasedFilterInterceptor implements Interceptor {
     public static class Builder implements Interceptor.Builder  {
 
         private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        private static final String START_TIME = "start-time";
+        private static final String START_TIME = "rollback.startTime";
         private static final long DEFAULT_START_TIME = 0L;
-        private static final String STOP_TIME = "stop-time";
+        private static final String STOP_TIME = "rollback.stopTime";
         private static final long DEFAULT_STOP_TIME = Long.MAX_VALUE;
 
         private long startTime;
@@ -108,7 +107,7 @@ public class TimeBasedFilterInterceptor implements Interceptor {
 
         @Override
         public void configure(Context context) {
-             startTime = Optional.ofNullable(context.getString(START_TIME))
+            startTime = Optional.ofNullable(context.getString(START_TIME))
                     .map(s -> {
                         logger.info("config TimeBasedFilterInterceptor, start time is {}", s);
                         try {
@@ -120,7 +119,7 @@ public class TimeBasedFilterInterceptor implements Interceptor {
                     })
                     .orElse(DEFAULT_START_TIME);
 
-             stopTime = Optional.ofNullable(context.getString(STOP_TIME))
+            stopTime = Optional.ofNullable(context.getString(STOP_TIME))
                     .map(s -> {
                         logger.info("config TimeBasedFilterInterceptor, stop time is {}", s);
                         try {
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 18cef6065..aee6a11f7 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -108,8 +108,8 @@ public final class SortSdkSource extends AbstractSource
      */
     @Override
     public synchronized void start() {
-        LOG.info("start to SortSdkSource:{}", taskName);
         int sortSdkClientNum = CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM, DEFAULT_SORT_SDK_CLIENT_NUM);
+        LOG.info("start to SortSdkSource:{}, client num is {}", taskName, sortSdkClientNum);
         for (int i = 0; i < sortSdkClientNum; i++) {
             this.sortClients.add(this.newClient(taskName));
         }