You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/08 02:40:01 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5805][Sort] Fix init interceptor incorrectly (#5806)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 523ee6c81 [INLONG-5805][Sort] Fix init interceptor incorrectly (#5806)
523ee6c81 is described below
commit 523ee6c817868b1b9a0908ce723c495eba45c1ce
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Sep 7 13:59:53 2022 +0800
[INLONG-5805][Sort] Fix init interceptor incorrectly (#5806)
---
.../standalone/utils/FlumeConfigGenerator.java | 25 ++++++++++++++++------
.../rollback/TimeBasedFilterInterceptor.java | 17 +++++++--------
.../standalone/source/sortsdk/SortSdkSource.java | 2 +-
3 files changed, 28 insertions(+), 16 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index 2981d1982..ca3e1b102 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -33,8 +33,9 @@ public class FlumeConfigGenerator {
public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type";
public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
- public static final String KEY_SDK_START_TIME = "sortSdk.startTime";
- public static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime";
+ public static final String KEY_SORT_INTERCEPTOR_TYPE = "interceptor.type";
+ public static final String KEY_ROLLBACK_START_TIME = "rollback.startTime";
+ public static final String KEY_ROLLBACK_STOP_TIME = "rollback.stopTime";
public static Map<String, String> generateFlumeConfiguration(SortTaskConfig taskConfig) {
Map<String, String> flumeConf = new HashMap<>();
@@ -161,12 +162,24 @@ public class FlumeConfigGenerator {
flumeConf.put(selectorTypeKey, "org.apache.flume.channel.ReplicatingChannelSelector");
// valid msg time interval
builder.setLength(0);
- String startTimeKey = builder.append(prefix).append(KEY_SDK_START_TIME).toString();
- Optional.ofNullable(sinkParams.get(KEY_SDK_START_TIME))
+ String interceptorKey = builder.append(prefix).append("interceptors").toString();
+ String interceptorName = name + "Interceptor";
+ flumeConf.put(interceptorKey, interceptorName);
+
+ builder.setLength(0);
+ String interceptorType = builder.append(prefix).append("interceptors.").append(interceptorName)
+ .append(".type").toString();
+ Optional.ofNullable(CommonPropertiesHolder.getString(KEY_SORT_INTERCEPTOR_TYPE))
+ .map(type -> flumeConf.put(interceptorType, type));
+ builder.setLength(0);
+ String startTimeKey = builder.append(prefix).append("interceptors.").append(interceptorName).append(".")
+ .append(KEY_ROLLBACK_START_TIME).toString();
+ Optional.ofNullable(CommonPropertiesHolder.getString(KEY_ROLLBACK_START_TIME))
.map(startTime -> flumeConf.put(startTimeKey, startTime));
builder.setLength(0);
- String stopTimeKey = builder.append(prefix).append(KEY_SDK_STOP_TIME).toString();
- Optional.ofNullable(sinkParams.get(KEY_SDK_STOP_TIME))
+ String stopTimeKey = builder.append(prefix).append("interceptors.").append(interceptorName).append(".")
+ .append(KEY_ROLLBACK_STOP_TIME).toString();
+ Optional.ofNullable(CommonPropertiesHolder.getString(KEY_ROLLBACK_STOP_TIME))
.map(stopTime -> flumeConf.put(stopTimeKey, stopTime));
appendCommon(flumeConf, prefix, null, name);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java
index 5894a8b87..79739794a 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java
@@ -17,12 +17,10 @@
package org.apache.inlong.sort.standalone.rollback;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,15 +57,16 @@ public class TimeBasedFilterInterceptor implements Interceptor {
@Override
public Event intercept(Event event) {
long logTime;
+ ProfileEvent profile;
if (event instanceof ProfileEvent) {
- ProfileEvent profile = (ProfileEvent) event;
+ profile = (ProfileEvent) event;
logTime = profile.getRawLogTime();
} else {
- logTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
- System.currentTimeMillis());
+ return event;
}
if (logTime > stopTime || logTime < startTime) {
+ profile.ack();
return null;
}
return event;
@@ -93,9 +92,9 @@ public class TimeBasedFilterInterceptor implements Interceptor {
public static class Builder implements Interceptor.Builder {
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- private static final String START_TIME = "start-time";
+ private static final String START_TIME = "rollback.startTime";
private static final long DEFAULT_START_TIME = 0L;
- private static final String STOP_TIME = "stop-time";
+ private static final String STOP_TIME = "rollback.stopTime";
private static final long DEFAULT_STOP_TIME = Long.MAX_VALUE;
private long startTime;
@@ -108,7 +107,7 @@ public class TimeBasedFilterInterceptor implements Interceptor {
@Override
public void configure(Context context) {
- startTime = Optional.ofNullable(context.getString(START_TIME))
+ startTime = Optional.ofNullable(context.getString(START_TIME))
.map(s -> {
logger.info("config TimeBasedFilterInterceptor, start time is {}", s);
try {
@@ -120,7 +119,7 @@ public class TimeBasedFilterInterceptor implements Interceptor {
})
.orElse(DEFAULT_START_TIME);
- stopTime = Optional.ofNullable(context.getString(STOP_TIME))
+ stopTime = Optional.ofNullable(context.getString(STOP_TIME))
.map(s -> {
logger.info("config TimeBasedFilterInterceptor, stop time is {}", s);
try {
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 18cef6065..aee6a11f7 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -108,8 +108,8 @@ public final class SortSdkSource extends AbstractSource
*/
@Override
public synchronized void start() {
- LOG.info("start to SortSdkSource:{}", taskName);
int sortSdkClientNum = CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM, DEFAULT_SORT_SDK_CLIENT_NUM);
+ LOG.info("start to SortSdkSource:{}, client num is {}", taskName, sortSdkClientNum);
for (int i = 0; i < sortSdkClientNum; i++) {
this.sortClients.add(this.newClient(taskName));
}