You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/05 09:38:42 UTC
[incubator-inlong] branch master updated: [INLONG][feature][audit] add audit-common module (#1904)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cee052c [INLONG][feature][audit] add audit-common module (#1904)
cee052c is described below
commit cee052cc7c11dbf48b815ee7f839aa6a212d33a7
Author: baomingyu <ba...@163.com>
AuthorDate: Sun Dec 5 17:38:36 2021 +0800
[INLONG][feature][audit] add audit-common module (#1904)
---
codestyle/suppressions.xml | 1 +
inlong-audit/audit-common/pom.xml | 28 +++
.../inlong/audit/consts/AttributeConstants.java | 27 +++
.../inlong/audit/consts/ConfigConstants.java | 50 ++++
.../org/apache/inlong/audit/file/ConfigHolder.java | 142 +++++++++++
.../apache/inlong/audit/file/ConfigManager.java | 261 +++++++++++++++++++++
.../apache/inlong/audit/file/RemoteConfigJson.java | 54 +++++
.../audit/file/holder/ConfigUpdateCallback.java | 29 +++
.../audit/file/holder/PropertiesConfigHolder.java | 153 ++++++++++++
.../org/apache/inlong/audit/http/StatusCode.java | 29 +++
.../apache/inlong/audit/protocol/AuditData.java | 129 ++++++++++
.../org/apache/inlong/audit/protocol/Commands.java | 69 ++++++
.../audit-common/src/main/proto/AuditApi.proto | 75 ++++++
13 files changed, 1047 insertions(+)
diff --git a/codestyle/suppressions.xml b/codestyle/suppressions.xml
index c7f7a86..e850540 100644
--- a/codestyle/suppressions.xml
+++ b/codestyle/suppressions.xml
@@ -21,6 +21,7 @@
<suppressions>
<!-- suppress all checks in the generated directories -->
+ <suppress checks=".*" files="AuditApi.java" />
<suppress checks=".*" files=".+[\\/]classes[\\/].+\.java" />
<suppress checks=".*" files=".+[\\/]generated[\\/].+\.java" />
<suppress checks=".*" files=".+[\\/]generated-sources[\\/].+\.java" />
diff --git a/inlong-audit/audit-common/pom.xml b/inlong-audit/audit-common/pom.xml
index 424986f..7a00175 100644
--- a/inlong-audit/audit-common/pom.xml
+++ b/inlong-audit/audit-common/pom.xml
@@ -39,4 +39,32 @@
</dependency>
</dependencies>
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.5.0.Final</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.1</version>
+ <extensions>true</extensions>
+ <configuration>
+ <protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
+ <protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/AttributeConstants.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/AttributeConstants.java
new file mode 100644
index 0000000..1bb2ca6
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/AttributeConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.consts;
+
+public interface AttributeConstants {
+
+ String INLONG_GROUP_ID = "inlongGroupId";
+ String INLONG_STREAM_ID = "inlongStreamId";
+
+ String SEPARATOR = "&";
+ String KEY_VALUE_SEPARATOR = "=";
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
new file mode 100644
index 0000000..01729cf
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.consts;
+
+public class ConfigConstants {
+
+ public static final String CONFIG_PORT = "port";
+
+ public static final String CONFIG_HOST = "host";
+
+ public static final String MSG_FACTORY_NAME = "msg-factory-name";
+
+ public static final String SERVICE_PROCESSOR_NAME = "service-decoder-name";
+
+ public static final String MESSAGE_HANDLER_NAME = "message-handler-name";
+
+ public static final String MAX_MSG_LENGTH = "max-msg-length";
+
+ public static final String TCP_NO_DELAY = "tcpNoDelay";
+
+ public static final String KEEP_ALIVE = "keepAlive";
+
+ public static final String HIGH_WATER_MARK = "highWaterMark";
+
+ public static final String RECEIVE_BUFFER_SIZE = "receiveBufferSize";
+
+ public static final String SEND_BUFFER_SIZE = "sendBufferSize";
+
+ public static final String TRAFFIC_CLASS = "trafficClass";
+
+ public static final String MAX_THREADS = "max-threads";
+
+ public static final int MSG_MAX_LENGTH_BYTES = 20 * 1024 * 1024;
+
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigHolder.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigHolder.java
new file mode 100644
index 0000000..3910d00
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigHolder.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file;
+
+import com.google.common.base.Splitter;
+import java.io.File;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.inlong.audit.file.holder.ConfigUpdateCallback;
+import org.apache.inlong.audit.consts.AttributeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConfigHolder {
+
+ public static final Splitter.MapSplitter MAP_SPLITTER =
+ Splitter.on(AttributeConstants.SEPARATOR)
+ .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigHolder.class);
+ private final String fileName;
+ private final AtomicBoolean fileChanged = new AtomicBoolean(false);
+ // list of callbacks for this holder
+ private final List<ConfigUpdateCallback> callbackList = new ArrayList<ConfigUpdateCallback>();
+ private long lastModifyTime;
+ private String filePath;
+ private File configFile;
+ private boolean needToCheckChanged;
+
+ public ConfigHolder(String fileName, boolean needToCheckChanged) {
+ this.fileName = fileName;
+ this.needToCheckChanged = needToCheckChanged;
+ setFilePath(fileName);
+ if (configFile != null) {
+ this.lastModifyTime = configFile.lastModified();
+ }
+ }
+
+ /**
+ * add callback
+ *
+ * @param callback - callback
+ */
+ public void addUpdateCallback(ConfigUpdateCallback callback) {
+ callbackList.add(callback);
+ }
+
+ /**
+ * execute callbacks
+ */
+ public void executeCallbacks() {
+ for (ConfigUpdateCallback callback : callbackList) {
+ callback.update();
+ }
+ }
+
+ /**
+ * load from file to holder
+ */
+ public abstract void loadFromFileToHolder();
+
+ /**
+ * check updater
+ *
+ * @return - true if updated
+ */
+ public boolean checkAndUpdateHolder() {
+ if (fileChanged.compareAndSet(true, false)
+ || (configFile != null && configFile.lastModified() != this.lastModifyTime)) {
+ if (configFile != null) {
+ this.lastModifyTime = configFile.lastModified();
+ }
+ LOG.info("file {} has changed, reload from local file agent", getFileName());
+ loadFromFileToHolder();
+ return true;
+ }
+ return false;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ /**
+ * get file name
+ *
+ * @return file name with prefix
+ */
+ public String getNextBackupFileName() {
+ SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
+ String dateStr = format.format(new Date(System.currentTimeMillis()));
+ return getFilePath() + "." + dateStr;
+ }
+
+ /**
+ * file name with base path.
+ *
+ * @return
+ */
+ public String getFilePath() {
+ return filePath;
+ }
+
+ private void setFilePath(String fileName) {
+ URL url = getClass().getClassLoader().getResource(fileName);
+ if (url != null) {
+ this.filePath = url.getPath();
+ this.configFile = new File(this.filePath);
+ LOG.info("set file path lastTime: {}, currentTime: {}",
+ lastModifyTime, configFile.lastModified());
+ }
+ }
+
+ public boolean isNeedToCheckChanged() {
+ return needToCheckChanged;
+ }
+
+ public abstract Map<String, String> getHolder();
+
+ public AtomicBoolean getFileChanged() {
+ return fileChanged;
+ }
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
new file mode 100644
index 0000000..0774a4a
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file;
+
+import com.google.gson.Gson;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.inlong.audit.file.holder.PropertiesConfigHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ConfigManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class);
+
+ private static final Map<String, ConfigHolder> holderMap =
+ new ConcurrentHashMap<>();
+
+ private static ConfigManager instance = null;
+
+ private static String DEFAULT_CONFIG_PROPERTIES = "server.properties";
+
+ static {
+ instance = getInstance(DEFAULT_CONFIG_PROPERTIES, true);
+ ReloadConfigWorker reloadProperties = new ReloadConfigWorker(instance);
+ reloadProperties.setDaemon(true);
+ reloadProperties.start();
+ }
+
+ public static ConfigManager getInstance() {
+ return instance;
+ }
+
+ /**
+ * get instance for manager
+ * @return
+ */
+ public static ConfigManager getInstance(String fileName, boolean needToCheckChanged) {
+ synchronized (ConfigManager.class) {
+ if (instance == null) {
+ instance = new ConfigManager();
+ }
+ ConfigHolder holder = holderMap.get(fileName);
+ if (holder == null) {
+ holder = new PropertiesConfigHolder(fileName, needToCheckChanged);
+ holder.loadFromFileToHolder();
+ holderMap.putIfAbsent(fileName, holder);
+ }
+ }
+ return instance;
+ }
+
+ public Map<String, String> getProperties(String fileName) {
+ ConfigHolder holder = holderMap.get(fileName);
+ if (holder != null) {
+ return holder.getHolder();
+ }
+ return null;
+ }
+
+ private boolean updatePropertiesHolder(Map<String, String> result,
+ String holderName, boolean addElseRemove) {
+ if (StringUtils.isNotEmpty(holderName)) {
+ PropertiesConfigHolder holder = (PropertiesConfigHolder)
+ holderMap.get(holderName + ".properties");
+ return updatePropertiesHolder(result, holder, true);
+ }
+ return true;
+ }
+
+ /**
+ * update old maps, reload local files if changed.
+ *
+ * @param result - map pending to be added
+ * @param holder - property holder
+ * @param addElseRemove - if add(true) else remove(false)
+ * @return true if changed else false.
+ */
+ private boolean updatePropertiesHolder(Map<String, String> result,
+ PropertiesConfigHolder holder, boolean addElseRemove) {
+ Map<String, String> tmpHolder = holder.forkHolder();
+ boolean changed = false;
+ for (Entry<String, String> entry : result.entrySet()) {
+ String oldValue = addElseRemove
+ ? tmpHolder.put(entry.getKey(), entry.getValue()) : tmpHolder.remove(entry.getKey());
+ // if addElseRemove is false, that means removing item, changed is true.
+ if (oldValue == null || !oldValue.equals(entry.getValue()) || !addElseRemove) {
+ changed = true;
+ }
+ }
+
+ if (changed) {
+ return holder.loadFromHolderToFile(tmpHolder);
+ } else {
+ return false;
+ }
+ }
+
+ public ConfigHolder getDefaultConfigHolder() {
+ return holderMap.get(DEFAULT_CONFIG_PROPERTIES);
+ }
+
+ public ConfigHolder getConfigHolder(String fileName) {
+ return holderMap.get(fileName);
+ }
+
+ /**
+ * load worker
+ */
+ private static class ReloadConfigWorker extends Thread {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReloadConfigWorker.class);
+ private final ConfigManager configManager;
+ private final CloseableHttpClient httpClient;
+ private final Gson gson = new Gson();
+ private boolean isRunning = true;
+
+ public ReloadConfigWorker(ConfigManager managerInstance) {
+ this.configManager = managerInstance;
+ this.httpClient = constructHttpClient();
+ }
+
+ private synchronized CloseableHttpClient constructHttpClient() {
+ long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout((int) timeoutInMs)
+ .setSocketTimeout((int) timeoutInMs).build();
+ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+ httpClientBuilder.setDefaultRequestConfig(requestConfig);
+ return httpClientBuilder.build();
+ }
+
+ public int getRandom(int min, int max) {
+ return (int) (Math.random() * (max + 1 - min)) + min;
+ }
+
+ private long getSleepTime() {
+ String sleepTimeInMsStr =
+ configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get(
+ "configCheckIntervalMs");
+ long sleepTimeInMs = 10000;
+ try {
+ if (sleepTimeInMsStr != null) {
+ sleepTimeInMs = Long.parseLong(sleepTimeInMsStr);
+ }
+ } catch (Exception ignored) {
+ LOG.info("ignored Exception ", ignored);
+ }
+ return sleepTimeInMs + getRandom(0, 5000);
+ }
+
+ public void close() {
+ isRunning = false;
+ }
+
+ private void checkLocalFile() {
+ for (ConfigHolder holder : holderMap.values()) {
+ boolean isChanged = holder.checkAndUpdateHolder();
+ if (isChanged) {
+ holder.executeCallbacks();
+ }
+ }
+ }
+
+ private boolean checkWithManager(String hostUrl) {
+ HttpGet httpGet = null;
+ try {
+ String url = "http://" + hostUrl + "/api/inlong/manager/openapi/audit/getConfig";
+ LOG.info("start to request {} to get config info", url);
+ httpGet = new HttpGet(url);
+ httpGet.addHeader(HttpHeaders.CONNECTION, "close");
+
+ // request with post
+ CloseableHttpResponse response = httpClient.execute(httpGet);
+ String returnStr = EntityUtils.toString(response.getEntity());
+ // get groupId <-> topic and m value.
+
+ Map<String, String> configJsonMap = gson.fromJson(returnStr, Map.class);
+ if (configJsonMap != null && configJsonMap.size() > 0) {
+ for (Entry<String, String> entry : configJsonMap.entrySet()) {
+ Map<String, String> valueMap = gson.fromJson(entry.getValue(), Map.class);
+ configManager.updatePropertiesHolder(valueMap,
+ entry.getKey(), true);
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("exception caught", ex);
+ return false;
+ } finally {
+ if (httpGet != null) {
+ httpGet.releaseConnection();
+ }
+ }
+ return true;
+ }
+
+ private void checkRemoteConfig() {
+
+ try {
+ String managerHosts = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get("manager_hosts");
+ String[] hostList = StringUtils.split(managerHosts, ",");
+ for (String host : hostList) {
+ if (checkWithManager(host)) {
+ break;
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("exception caught", ex);
+ }
+ }
+
+ @Override
+ public void run() {
+ long count = 0;
+ while (isRunning) {
+
+ long sleepTimeInMs = getSleepTime();
+ count += 1;
+ try {
+ checkLocalFile();
+ // wait for 30 seconds to update remote config
+ if (count % 3 == 0) {
+ checkRemoteConfig();
+ count = 0;
+ }
+ TimeUnit.MILLISECONDS.sleep(sleepTimeInMs);
+ } catch (Exception ex) {
+ LOG.error("exception caught", ex);
+ }
+ }
+ }
+ }
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
new file mode 100644
index 0000000..9172ff7
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file;
+
+import java.util.List;
+
+public class RemoteConfigJson {
+
+ private boolean result;
+ private List<DataItem> data;
+ private int errCode;
+
+ public List<DataItem> getData() {
+ return data;
+ }
+
+ public int getErrCode() {
+ return errCode;
+ }
+
+ public static class DataItem {
+
+ private String groupId;
+ private String topic;
+ private String m;
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getM() {
+ return m;
+ }
+ }
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/ConfigUpdateCallback.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/ConfigUpdateCallback.java
new file mode 100644
index 0000000..cacf3e0
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/ConfigUpdateCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file.holder;
+
+/**
+ * update callback
+ */
+public interface ConfigUpdateCallback {
+
+ /**
+ * update
+ */
+ void update();
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/PropertiesConfigHolder.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/PropertiesConfigHolder.java
new file mode 100644
index 0000000..1afe7c5
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/PropertiesConfigHolder.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file.holder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
+import org.apache.inlong.audit.file.ConfigHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * properties to map
+ */
+public class PropertiesConfigHolder extends ConfigHolder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PropertiesConfigHolder.class);
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private Map<String, String> holder;
+
+ public PropertiesConfigHolder(String fileName, boolean needToCheckChanged) {
+ super(fileName, needToCheckChanged);
+ }
+
+ @Override
+ public void loadFromFileToHolder() {
+ readWriteLock.readLock().lock();
+ try {
+ Map<String, String> tmpHolder = loadProperties();
+ LOG.info(getFileName() + " load content {}", tmpHolder);
+ holder = tmpHolder;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * holder
+ *
+ * @return
+ */
+ public Map<String, String> forkHolder() {
+ Map<String, String> tmpHolder = new HashMap<String, String>();
+ if (holder != null) {
+ for (Map.Entry<String, String> entry : holder.entrySet()) {
+ tmpHolder.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return tmpHolder;
+ }
+
+ private List<String> getStringListFromHolder(Map<String, String> tmpHolder) {
+ List<String> result = new ArrayList<String>();
+ for (Map.Entry<String, String> entry : tmpHolder.entrySet()) {
+ result.add(entry.getKey() + "=" + entry.getValue());
+ }
+ return result;
+ }
+
+ /**
+ * load from holder
+ * @param tmpHolder
+ * @return
+ */
+ public boolean loadFromHolderToFile(Map<String, String> tmpHolder) {
+ readWriteLock.writeLock().lock();
+ boolean isSuccess = false;
+ try {
+ File sourceFile = new File(getFilePath());
+ File targetFile = new File(getNextBackupFileName());
+ File tmpNewFile = new File(getFileName() + ".tmp");
+
+ if (sourceFile.exists()) {
+ FileUtils.copyFile(sourceFile, targetFile);
+ }
+
+ List<String> lines = getStringListFromHolder(tmpHolder);
+ FileUtils.writeLines(tmpNewFile, lines);
+
+ FileUtils.copyFile(tmpNewFile, sourceFile);
+ tmpNewFile.delete();
+ isSuccess = true;
+ getFileChanged().set(true);
+ } catch (Exception ex) {
+ LOG.error("error in writing file", ex);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ return isSuccess;
+ }
+
+ protected Map<String, String> loadProperties() {
+ Map<String, String> result = new HashMap<String, String>();
+ InputStream inStream = null;
+ try {
+ URL url = getClass().getClassLoader().getResource(getFileName());
+ inStream = url != null ? url.openStream() : null;
+
+ if (inStream == null) {
+ LOG.error("InputStream {} is null!", getFileName());
+ }
+ Properties props = new Properties();
+ props.load(inStream);
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ result.put((String) entry.getKey(), (String) entry.getValue());
+ }
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", getFileName(), e);
+ } catch (Exception e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", getFileName(), e);
+ } finally {
+ if (null != inStream) {
+ try {
+ inStream.close();
+ } catch (IOException e) {
+ LOG.error("fail to loadTopics, inStream.close ,and e= {}", getFileName(), e);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Map<String, String> getHolder() {
+ return holder;
+ }
+
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/http/StatusCode.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/http/StatusCode.java
new file mode 100644
index 0000000..d45bc41
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/http/StatusCode.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.http;
+
+public class StatusCode {
+
+ public static final int SUCCESS = 1;
+
+ public static final int ILLEGAL_ARGUMENT = -100;
+
+ public static final int EXCEED_LEN = -101;
+
+ public static final int SERVICE_ERR = -105;
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
new file mode 100644
index 0000000..422fc37
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.protocol;
+
+public class AuditData {
+ private String ip;
+ private String dockerId;
+ private String threadId;
+ private long sdkTs;
+ private long packetId;
+ private long logTs;
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private String auditId;
+ private long count;
+ private long size;
+ private long delay;
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public String getDockerId() {
+ return dockerId;
+ }
+
+ public void setDockerId(String dockerId) {
+ this.dockerId = dockerId;
+ }
+
+ public String getThreadId() {
+ return threadId;
+ }
+
+ public void setThreadId(String threadId) {
+ this.threadId = threadId;
+ }
+
+ public long getSdkTs() {
+ return sdkTs;
+ }
+
+ public void setSdkTs(long sdkTs) {
+ this.sdkTs = sdkTs;
+ }
+
+ public long getPacketId() {
+ return packetId;
+ }
+
+ public void setPacketId(long packetId) {
+ this.packetId = packetId;
+ }
+
+ public long getLogTs() {
+ return logTs;
+ }
+
+ public void setLogTs(long logTs) {
+ this.logTs = logTs;
+ }
+
+ public String getInlongGroupId() {
+ return inlongGroupId;
+ }
+
+ public void setInlongGroupId(String inlongGroupId) {
+ this.inlongGroupId = inlongGroupId;
+ }
+
+ public String getInlongStreamId() {
+ return inlongStreamId;
+ }
+
+ public void setInlongStreamId(String inlongStreamId) {
+ this.inlongStreamId = inlongStreamId;
+ }
+
+ public String getAuditId() {
+ return auditId;
+ }
+
+ public void setAuditId(String auditId) {
+ this.auditId = auditId;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public void setCount(long count) {
+ this.count = count;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
new file mode 100644
index 0000000..c5f6c05
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.protocol;
+
+import org.apache.inlong.audit.protocol.AuditApi.AuditReply;
+import org.apache.inlong.audit.protocol.AuditApi.AuditRequest;
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type;
+import org.apache.inlong.audit.protocol.AuditApi.Pong;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+public class Commands {
+
+ public static int HEAD_LENGTH = 4;
+
+ public static ChannelBuffer getPongChannelBuffer() {
+ BaseCommand cmdPong = BaseCommand.newBuilder()
+ .setType(Type.PONG)
+ .setPong(Pong.getDefaultInstance()).build();
+ return getChannelBuffer(cmdPong.toByteArray());
+ }
+
+ public static ChannelBuffer getPingChannelBuffer() {
+ BaseCommand cmdPing = BaseCommand.newBuilder()
+ .setType(Type.PING)
+ .setPong(Pong.getDefaultInstance()).build();
+ return getChannelBuffer(cmdPing.toByteArray());
+ }
+
+ public static ChannelBuffer getAuditRequestBuffer(AuditRequest auditRequest) {
+ BaseCommand cmdAuditRequest = BaseCommand.newBuilder()
+ .setType(Type.AUDITREQUEST)
+ .setAuditRequest(auditRequest).build();
+ return getChannelBuffer(cmdAuditRequest.toByteArray());
+ }
+
+ public static ChannelBuffer getAuditReplylBuffer(AuditReply auditReply) {
+ BaseCommand cmdAuditReply = BaseCommand.newBuilder()
+ .setType(Type.AUDITREPLY)
+ .setAuditReply(auditReply).build();
+ return getChannelBuffer(cmdAuditReply.toByteArray());
+ }
+
+ private static ChannelBuffer getChannelBuffer(byte[] body) {
+ /* [totalSize] | [body]*/
+ int totalLength = body.length;
+ ChannelBuffer cmdPingBuffer = ChannelBuffers.buffer(
+ HEAD_LENGTH + totalLength);
+ cmdPingBuffer.writeInt(totalLength);
+ cmdPingBuffer.writeBytes(body);
+ return cmdPingBuffer;
+ }
+}
diff --git a/inlong-audit/audit-common/src/main/proto/AuditApi.proto b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
new file mode 100644
index 0000000..ce98fcd
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.inlong.audit.protocol;
+
+message BaseCommand {
+ enum Type {
+ PING = 0;
+ PONG = 1;
+ AUDITREQUEST = 2;
+ AUDITREPLY = 3;
+ }
+ Type type = 1;
+ AuditRequest audit_request = 2;
+ AuditReply audit_reply = 3;
+ Ping ping = 4;
+ Pong pong = 5;
+}
+
+message Ping {
+}
+
+message Pong {
+}
+
+message AuditRequest {
+ uint64 request_id = 1;
+ AuditMessageHeader msg_header = 2;
+ repeated AuditMessageBody msg_body = 3;
+}
+
+message AuditMessageHeader {
+ string ip = 1;
+ string docker_id = 2;
+ string thread_id = 3;
+ uint64 sdk_ts = 4;
+ uint64 packet_id = 5;
+}
+
+message AuditMessageBody {
+ uint64 log_ts = 1;
+ string inlong_group_id= 2;
+ string inlong_stream_id= 3;
+ string audit_id = 4;
+ uint64 count = 5;
+ uint64 size = 6;
+ int64 delay = 7;
+}
+
+message AuditReply {
+ enum RSP_CODE {
+ SUCCESS = 0;
+ FAILED = 1;
+ DISASTER = 2;
+ }
+ uint64 request_id = 1;
+ RSP_CODE rsp_code = 2;
+ string message = 3;
+}