You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/21 08:16:00 UTC
[incubator-inlong] branch master updated: [INLONG-2256] SortStandalone support audit sdk. (#2261)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cb2229c [INLONG-2256] SortStandalone support audit sdk. (#2261)
cb2229c is described below
commit cb2229c1c10f790d7b5faea8e7438d28334631f9
Author: 卢春亮 <94...@qq.com>
AuthorDate: Fri Jan 21 16:15:52 2022 +0800
[INLONG-2256] SortStandalone support audit sdk. (#2261)
---
inlong-sort-standalone/pom.xml | 14 +++
.../sort/standalone/metrics/SortMetricItem.java | 95 ++++++++++++++++-
.../sort-standalone-source/pom.xml | 30 +-----
.../sort/standalone/SortStandaloneApplication.java | 9 ++
.../sort/standalone/metrics/audit/AuditUtils.java | 116 +++++++++++++++++++++
.../sort/standalone/sink/hive/HiveSinkContext.java | 6 +-
.../sink/pulsar/PulsarProducerCluster.java | 2 +
.../source/sortsdk/SortSdkSourceContext.java | 6 +-
8 files changed, 246 insertions(+), 32 deletions(-)
diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml
index 6a797b4..84db940 100644
--- a/inlong-sort-standalone/pom.xml
+++ b/inlong-sort-standalone/pom.xml
@@ -56,6 +56,8 @@
<fastjson.version>1.2.79</fastjson.version>
<simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
<log4j.version>2.17.1</log4j.version>
+ <mockito.version>2.23.0</mockito.version>
+ <jakarta.validation.version>2.0.2</jakarta.validation.version>
</properties>
<dependencies>
@@ -118,6 +120,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <artifactId>mockito-core</artifactId>
+ <groupId>org.mockito</groupId>
+ <scope>test</scope>
+ <version>${mockito.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
@@ -225,6 +233,12 @@
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
+ <dependency>
+ <groupId>jakarta.validation</groupId>
+ <artifactId>jakarta.validation-api</artifactId>
+ <scope>compile</scope>
+ <version>${jakarta.validation.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
index ae0cf9f..7a744d0 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
@@ -20,11 +20,14 @@ package org.apache.inlong.sort.standalone.metrics;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Event;
import org.apache.inlong.commons.config.metrics.CountMetric;
import org.apache.inlong.commons.config.metrics.Dimension;
import org.apache.inlong.commons.config.metrics.MetricDomain;
import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.msg.AttributeConstants;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.utils.Constants;
/**
@@ -109,15 +112,101 @@ public class SortMetricItem extends MetricItem {
/**
* fillInlongId
- *
+ *
* @param event
* @param dimensions
*/
public static void fillInlongId(Event event, Map<String, String> dimensions) {
Map<String, String> headers = event.getHeaders();
- String inlongGroupId = headers.getOrDefault(Constants.INLONG_GROUP_ID, "-");
- String inlongStreamId = headers.getOrDefault(Constants.INLONG_STREAM_ID, "-");
+ String inlongGroupId = getInlongGroupId(headers);
+ String inlongStreamId = getInlongStreamId(headers);
dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
}
+
+ /**
+ * fillAuditFormatTime
+ *
+ * @param event
+ * @param dimensions
+ */
+ public static void fillAuditFormatTime(Event event, Map<String, String> dimensions) {
+ long msgTime = getLogTime(event);
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ }
+
+ /**
+ * getAuditFormatTime
+ *
+ * @param msgTime
+ * @return
+ */
+ public static long getAuditFormatTime(long msgTime) {
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ return auditFormatTime;
+ }
+
+ /**
+ * getInlongGroupId
+ *
+ * @param headers
+ * @return
+ */
+ public static String getInlongGroupId(Map<String, String> headers) {
+ String inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
+ if (inlongGroupId == null) {
+ inlongGroupId = headers.getOrDefault(Constants.TOPIC, "");
+ }
+ return inlongGroupId;
+ }
+
+ /**
+ * getInlongStreamId
+ *
+ * @param headers
+ * @return
+ */
+ public static String getInlongStreamId(Map<String, String> headers) {
+ String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
+ if (inlongStreamId == null) {
+ inlongStreamId = headers.getOrDefault(AttributeConstants.INTERFACE_ID, "");
+ }
+ return inlongStreamId;
+ }
+
+ /**
+ * getLogTime
+ *
+ * @param headers
+ * @return
+ */
+ public static long getLogTime(Map<String, String> headers) {
+ String strLogTime = headers.get(Constants.HEADER_KEY_MSG_TIME);
+ if (strLogTime == null) {
+ strLogTime = headers.get(AttributeConstants.DATA_TIME);
+ }
+ if (strLogTime == null) {
+ return System.currentTimeMillis();
+ }
+ long logTime = NumberUtils.toLong(strLogTime, 0);
+ if (logTime == 0) {
+ logTime = System.currentTimeMillis();
+ }
+ return logTime;
+ }
+
+ /**
+ * getLogTime
+ *
+ * @param event
+ * @return
+ */
+ public static long getLogTime(Event event) {
+ if (event != null) {
+ Map<String, String> headers = event.getHeaders();
+ return getLogTime(headers);
+ }
+ return System.currentTimeMillis();
+ }
}
diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml b/inlong-sort-standalone/sort-standalone-source/pom.xml
index 000f85c..5d17ac8 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -42,39 +42,15 @@
<artifactId>sort-standalone-common</artifactId>
<version>${project.version}</version>
</dependency>
-
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-sort-sdk</artifactId>
<version>${project.version}</version>
</dependency>
-
<dependency>
- <groupId>jakarta.validation</groupId>
- <artifactId>jakarta.validation-api</artifactId>
- <version>2.0.2</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <artifactId>powermock-module-junit4</artifactId>
- <groupId>org.powermock</groupId>
- <scope>test</scope>
- <version>2.0.2</version>
- </dependency>
-
- <dependency>
- <artifactId>powermock-api-mockito2</artifactId>
- <groupId>org.powermock</groupId>
- <scope>test</scope>
- <version>2.0.2</version>
- </dependency>
-
- <dependency>
- <artifactId>mockito-core</artifactId>
- <groupId>org.mockito</groupId>
- <scope>test</scope>
- <version>2.23.0</version>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-sdk</artifactId>
+ <version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
index 48c0470..f5d0b3f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.standalone;
import org.apache.flume.node.Application;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.metrics.MetricObserver;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
@@ -44,6 +45,14 @@ public class SortStandaloneApplication {
cluster.start();
// metrics
MetricObserver.init(CommonPropertiesHolder.get());
+ AuditUtils.initAudit();
+ Runtime.getRuntime().addShutdownHook(new Thread("sortstandalone-shutdown-hook") {
+
+ @Override
+ public void run() {
+ AuditUtils.sendReport();
+ }
+ });
Thread.sleep(5000);
} catch (Exception e) {
LOG.error("A fatal error occurred while running. Exception follows.", e);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
new file mode 100644
index 0000000..3a032b2
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.audit;
+
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flume.Event;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.util.AuditConfig;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+
+/**
+ *
+ * AuditUtils
+ */
+public class AuditUtils {
+
+ public static final String AUDIT_KEY_FILE_PATH = "audit.filePath";
+ public static final String AUDIT_DEFAULT_FILE_PATH = "/data/inlong/audit/";
+ public static final String AUDIT_KEY_MAX_CACHE_ROWS = "audit.maxCacheRows";
+ public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
+ public static final String AUDIT_KEY_PROXYS = "audit.proxys";
+ public static final String AUDIT_KEY_IS_AUDIT = "audit.isAudit";
+
+ public static final int AUDIT_ID_READ_SUCCESS = 7;
+ public static final int AUDIT_ID_SEND_SUCCESS = 8;
+
+ private static boolean IS_AUDIT = true;
+
+ /**
+ * initAudit
+ */
+ public static void initAudit() {
+ // IS_AUDIT
+ IS_AUDIT = BooleanUtils.toBoolean(CommonPropertiesHolder.getString(AUDIT_KEY_IS_AUDIT));
+ if (IS_AUDIT) {
+ // AuditProxy
+ String strIpPorts = CommonPropertiesHolder.getString(AUDIT_KEY_PROXYS);
+ HashSet<String> proxys = new HashSet<>();
+ if (!StringUtils.isBlank(strIpPorts)) {
+ String[] ipPorts = strIpPorts.split("\\s+");
+ for (String ipPort : ipPorts) {
+ proxys.add(ipPort);
+ }
+ }
+ AuditImp.getInstance().setAuditProxy(proxys);
+ // AuditConfig
+ String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH,
+ AUDIT_DEFAULT_FILE_PATH);
+ int maxCacheRow = NumberUtils.toInt(
+ CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
+ AUDIT_DEFAULT_MAX_CACHE_ROWS);
+ AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
+ AuditImp.getInstance().setAuditConfig(auditConfig);
+ }
+ }
+
+ /**
+ * add
+ *
+ * @param auditID
+ * @param event
+ */
+ public static void add(int auditID, ProfileEvent event) {
+ if (IS_AUDIT && event != null) {
+ String inlongGroupId = event.getInlongGroupId();
+ String inlongStreamId = event.getInlongStreamId();
+ long logTime = event.getRawLogTime();
+ AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+ }
+ }
+
+ /**
+ * add
+ *
+ * @param auditID
+ * @param event
+ */
+ public static void add(int auditID, Event event) {
+ if (IS_AUDIT && event != null) {
+ Map<String, String> headers = event.getHeaders();
+ String inlongGroupId = SortMetricItem.getInlongGroupId(headers);
+ String inlongStreamId = SortMetricItem.getInlongStreamId(headers);
+ long logTime = SortMetricItem.getLogTime(headers);
+ AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+ }
+ }
+
+ /**
+ * sendReport
+ */
+ public static void sendReport() {
+ AuditImp.getInstance().sendReport();
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
index 8bd706f..545de7a 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
@@ -39,6 +39,7 @@ import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
@@ -250,6 +251,9 @@ public class HiveSinkContext extends SinkContext {
if (result) {
metricItem.sendSuccessCount.addAndGet(count);
metricItem.sendSuccessSize.addAndGet(size);
+ currentRecord.getEvents().forEach((event) -> {
+ AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, event);
+ });
if (sendTime > 0) {
long currentTime = System.currentTimeMillis();
long sinkDuration = currentTime - sendTime;
@@ -272,7 +276,7 @@ public class HiveSinkContext extends SinkContext {
/**
* getHiveConnection
*
- * @return Connection
+ * @return Connection
* @throws SQLException
* @throws ClassNotFoundException
*/
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 993e97f..8e52d03 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -32,6 +32,7 @@ import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -273,6 +274,7 @@ public class PulsarProducerCluster implements LifecycleAware {
if (result) {
metricItem.sendSuccessCount.incrementAndGet();
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
+ AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord);
if (sendTime > 0) {
long currentTime = System.currentTimeMillis();
long sinkDuration = currentTime - sendTime;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
index 1ec37bd..c857f5f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
@@ -22,6 +22,7 @@ import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.source.SourceContext;
import javax.annotation.Nullable;
@@ -81,17 +82,19 @@ public final class SortSdkSourceContext extends SourceContext {
final Map<String, String> dimensions = this.createSortSdkSourceDimensionMap(event, sortId, topic);
final SortMetricItem metricItem = metricItemSet.findMetricItem(dimensions);
final int msgSize = event != null ? event.getBody().length : -1;
- this.reportToMetric(metricItem, fetchResult, msgSize);
+ this.reportToMetric(event, metricItem, fetchResult, msgSize);
}
/**
* Selector of metric report flow.
*
+ * @param event The fetched event. May be <b>null</b> when fetch failed occurs.
* @param item MetricItem that report to.
* @param fetchResult Result of fetching, SUCCESS or FAILURE.
* @param size The fetch length. -1 means fetch failure.
*/
private void reportToMetric(
+ @Nullable final ProfileEvent event,
@NotNull final SortMetricItem item,
final FetchResult fetchResult,
final int size) {
@@ -99,6 +102,7 @@ public final class SortSdkSourceContext extends SourceContext {
switch (fetchResult) {
case SUCCESS:
reportToMetric(item.readSuccessCount, item.readSuccessSize, size);
+ AuditUtils.add(AuditUtils.AUDIT_ID_READ_SUCCESS, event);
break;
case FAILURE:
reportToMetric(item.readFailCount, item.readFailSize, size);