You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/05 09:38:42 UTC

[incubator-inlong] branch master updated: [INLONG][feature][audit] add audit-common module (#1904)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cee052c  [INLONG][feature][audit] add audit-common module (#1904)
cee052c is described below

commit cee052cc7c11dbf48b815ee7f839aa6a212d33a7
Author: baomingyu <ba...@163.com>
AuthorDate: Sun Dec 5 17:38:36 2021 +0800

    [INLONG][feature][audit] add audit-common module (#1904)
---
 codestyle/suppressions.xml                         |   1 +
 inlong-audit/audit-common/pom.xml                  |  28 +++
 .../inlong/audit/consts/AttributeConstants.java    |  27 +++
 .../inlong/audit/consts/ConfigConstants.java       |  50 ++++
 .../org/apache/inlong/audit/file/ConfigHolder.java | 142 +++++++++++
 .../apache/inlong/audit/file/ConfigManager.java    | 261 +++++++++++++++++++++
 .../apache/inlong/audit/file/RemoteConfigJson.java |  54 +++++
 .../audit/file/holder/ConfigUpdateCallback.java    |  29 +++
 .../audit/file/holder/PropertiesConfigHolder.java  | 153 ++++++++++++
 .../org/apache/inlong/audit/http/StatusCode.java   |  29 +++
 .../apache/inlong/audit/protocol/AuditData.java    | 129 ++++++++++
 .../org/apache/inlong/audit/protocol/Commands.java |  69 ++++++
 .../audit-common/src/main/proto/AuditApi.proto     |  75 ++++++
 13 files changed, 1047 insertions(+)

diff --git a/codestyle/suppressions.xml b/codestyle/suppressions.xml
index c7f7a86..e850540 100644
--- a/codestyle/suppressions.xml
+++ b/codestyle/suppressions.xml
@@ -21,6 +21,7 @@
 
 <suppressions>
     <!-- suppress all checks in the generated directories -->
+    <suppress checks=".*" files="AuditApi.java" />
     <suppress checks=".*" files=".+[\\/]classes[\\/].+\.java" />
     <suppress checks=".*" files=".+[\\/]generated[\\/].+\.java" />
     <suppress checks=".*" files=".+[\\/]generated-sources[\\/].+\.java" />
diff --git a/inlong-audit/audit-common/pom.xml b/inlong-audit/audit-common/pom.xml
index 424986f..7a00175 100644
--- a/inlong-audit/audit-common/pom.xml
+++ b/inlong-audit/audit-common/pom.xml
@@ -39,4 +39,32 @@
         </dependency>
     </dependencies>
 
+    <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.5.0.Final</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>0.5.1</version>
+                <extensions>true</extensions>
+                <configuration>
+                    <protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
+                    <protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/AttributeConstants.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/AttributeConstants.java
new file mode 100644
index 0000000..1bb2ca6
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/AttributeConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.consts;
+
+public interface AttributeConstants {
+
+    String INLONG_GROUP_ID = "inlongGroupId";
+    String INLONG_STREAM_ID = "inlongStreamId";
+
+    String SEPARATOR = "&";
+    String KEY_VALUE_SEPARATOR = "=";
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
new file mode 100644
index 0000000..01729cf
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.consts;
+
+public class ConfigConstants {
+
+    public static final String CONFIG_PORT = "port";
+
+    public static final String CONFIG_HOST = "host";
+
+    public static final String MSG_FACTORY_NAME = "msg-factory-name";
+
+    public static final String SERVICE_PROCESSOR_NAME = "service-decoder-name";
+
+    public static final String MESSAGE_HANDLER_NAME = "message-handler-name";
+
+    public static final String MAX_MSG_LENGTH = "max-msg-length";
+
+    public static final String TCP_NO_DELAY = "tcpNoDelay";
+
+    public static final String KEEP_ALIVE = "keepAlive";
+
+    public static final String HIGH_WATER_MARK = "highWaterMark";
+
+    public static final String RECEIVE_BUFFER_SIZE = "receiveBufferSize";
+
+    public static final String SEND_BUFFER_SIZE = "sendBufferSize";
+
+    public static final String TRAFFIC_CLASS = "trafficClass";
+
+    public static final String MAX_THREADS = "max-threads";
+    
+    public static final int MSG_MAX_LENGTH_BYTES = 20 * 1024 * 1024;
+
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigHolder.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigHolder.java
new file mode 100644
index 0000000..3910d00
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigHolder.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file;
+
+import com.google.common.base.Splitter;
+import java.io.File;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.inlong.audit.file.holder.ConfigUpdateCallback;
+import org.apache.inlong.audit.consts.AttributeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConfigHolder {
+
+    public static final Splitter.MapSplitter MAP_SPLITTER =
+            Splitter.on(AttributeConstants.SEPARATOR)
+                    .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigHolder.class);
+    private final String fileName;
+    private final AtomicBoolean fileChanged = new AtomicBoolean(false);
+    // list of callbacks for this holder
+    private final List<ConfigUpdateCallback> callbackList = new ArrayList<ConfigUpdateCallback>();
+    private long lastModifyTime;
+    private String filePath;
+    private File configFile;
+    private boolean needToCheckChanged;
+
+    public ConfigHolder(String fileName, boolean needToCheckChanged) {
+        this.fileName = fileName;
+        this.needToCheckChanged = needToCheckChanged;
+        setFilePath(fileName);
+        if (configFile != null) {
+            this.lastModifyTime = configFile.lastModified();
+        }
+    }
+
+    /**
+     * add callback
+     *
+     * @param callback - callback
+     */
+    public void addUpdateCallback(ConfigUpdateCallback callback) {
+        callbackList.add(callback);
+    }
+
+    /**
+     * execute callbacks
+     */
+    public void executeCallbacks() {
+        for (ConfigUpdateCallback callback : callbackList) {
+            callback.update();
+        }
+    }
+
+    /**
+     * load from file to holder
+     */
+    public abstract void loadFromFileToHolder();
+
+    /**
+     * check updater
+     *
+     * @return - true if updated
+     */
+    public boolean checkAndUpdateHolder() {
+        if (fileChanged.compareAndSet(true, false)
+                || (configFile != null && configFile.lastModified() != this.lastModifyTime)) {
+            if (configFile != null) {
+                this.lastModifyTime = configFile.lastModified();
+            }
+            LOG.info("file {} has changed, reload from local file agent", getFileName());
+            loadFromFileToHolder();
+            return true;
+        }
+        return false;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    /**
+     * get file name
+     *
+     * @return file name with prefix
+     */
+    public String getNextBackupFileName() {
+        SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
+        String dateStr = format.format(new Date(System.currentTimeMillis()));
+        return getFilePath() + "." + dateStr;
+    }
+
+    /**
+     * file name with base path.
+     *
+     * @return
+     */
+    public String getFilePath() {
+        return filePath;
+    }
+
+    private void setFilePath(String fileName) {
+        URL url = getClass().getClassLoader().getResource(fileName);
+        if (url != null) {
+            this.filePath = url.getPath();
+            this.configFile = new File(this.filePath);
+            LOG.info("set file path lastTime: {}, currentTime: {}",
+                    lastModifyTime, configFile.lastModified());
+        }
+    }
+
+    public boolean isNeedToCheckChanged() {
+        return needToCheckChanged;
+    }
+
+    public abstract Map<String, String> getHolder();
+
+    public AtomicBoolean getFileChanged() {
+        return fileChanged;
+    }
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
new file mode 100644
index 0000000..0774a4a
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file;
+
+import com.google.gson.Gson;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.inlong.audit.file.holder.PropertiesConfigHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class);
+
+    private static final Map<String, ConfigHolder> holderMap =
+            new ConcurrentHashMap<>();
+
+    private static ConfigManager instance = null;
+
+    private static String DEFAULT_CONFIG_PROPERTIES = "server.properties";
+
+    static {
+        instance = getInstance(DEFAULT_CONFIG_PROPERTIES, true);
+        ReloadConfigWorker reloadProperties = new ReloadConfigWorker(instance);
+        reloadProperties.setDaemon(true);
+        reloadProperties.start();
+    }
+
+    public static ConfigManager getInstance() {
+        return instance;
+    }
+
+    /**
+     * get instance for manager
+     * @return
+     */
+    public static ConfigManager getInstance(String fileName, boolean needToCheckChanged) {
+        synchronized (ConfigManager.class) {
+            if (instance == null) {
+                instance = new ConfigManager();
+            }
+            ConfigHolder holder = holderMap.get(fileName);
+            if (holder == null) {
+                holder = new PropertiesConfigHolder(fileName, needToCheckChanged);
+                holder.loadFromFileToHolder();
+                holderMap.putIfAbsent(fileName, holder);
+            }
+        }
+        return instance;
+    }
+
+    public Map<String, String> getProperties(String fileName) {
+        ConfigHolder holder = holderMap.get(fileName);
+        if (holder != null) {
+            return holder.getHolder();
+        }
+        return null;
+    }
+
+    private boolean updatePropertiesHolder(Map<String, String> result,
+            String holderName, boolean addElseRemove) {
+        if (StringUtils.isNotEmpty(holderName)) {
+            PropertiesConfigHolder holder = (PropertiesConfigHolder)
+                    holderMap.get(holderName + ".properties");
+            return updatePropertiesHolder(result, holder, true);
+        }
+        return true;
+    }
+
+    /**
+     * update old maps, reload local files if changed.
+     *
+     * @param result        - map pending to be added
+     * @param holder        - property holder
+     * @param addElseRemove - if add(true) else remove(false)
+     * @return true if changed else false.
+     */
+    private boolean updatePropertiesHolder(Map<String, String> result,
+                                           PropertiesConfigHolder holder, boolean addElseRemove) {
+        Map<String, String> tmpHolder = holder.forkHolder();
+        boolean changed = false;
+        for (Entry<String, String> entry : result.entrySet()) {
+            String oldValue = addElseRemove
+                    ? tmpHolder.put(entry.getKey(), entry.getValue()) : tmpHolder.remove(entry.getKey());
+            // if addElseRemove is false, that means removing item, changed is true.
+            if (oldValue == null || !oldValue.equals(entry.getValue()) || !addElseRemove) {
+                changed = true;
+            }
+        }
+
+        if (changed) {
+            return holder.loadFromHolderToFile(tmpHolder);
+        } else {
+            return false;
+        }
+    }
+
+    public ConfigHolder getDefaultConfigHolder() {
+        return holderMap.get(DEFAULT_CONFIG_PROPERTIES);
+    }
+
+    public ConfigHolder getConfigHolder(String fileName) {
+        return holderMap.get(fileName);
+    }
+
+    /**
+     * load worker
+     */
+    private static class ReloadConfigWorker extends Thread {
+
+        private static final Logger LOG = LoggerFactory.getLogger(ReloadConfigWorker.class);
+        private final ConfigManager configManager;
+        private final CloseableHttpClient httpClient;
+        private final Gson gson = new Gson();
+        private boolean isRunning = true;
+
+        public ReloadConfigWorker(ConfigManager managerInstance) {
+            this.configManager = managerInstance;
+            this.httpClient = constructHttpClient();
+        }
+
+        private synchronized CloseableHttpClient constructHttpClient() {
+            long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
+            RequestConfig requestConfig = RequestConfig.custom()
+                    .setConnectTimeout((int) timeoutInMs)
+                    .setSocketTimeout((int) timeoutInMs).build();
+            HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+            httpClientBuilder.setDefaultRequestConfig(requestConfig);
+            return httpClientBuilder.build();
+        }
+
+        public int getRandom(int min, int max) {
+            return (int) (Math.random() * (max + 1 - min)) + min;
+        }
+
+        private long getSleepTime() {
+            String sleepTimeInMsStr =
+                    configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get(
+                            "configCheckIntervalMs");
+            long sleepTimeInMs = 10000;
+            try {
+                if (sleepTimeInMsStr != null) {
+                    sleepTimeInMs = Long.parseLong(sleepTimeInMsStr);
+                }
+            } catch (Exception ignored) {
+                LOG.info("ignored Exception ", ignored);
+            }
+            return sleepTimeInMs + getRandom(0, 5000);
+        }
+
+        public void close() {
+            isRunning = false;
+        }
+
+        private void checkLocalFile() {
+            for (ConfigHolder holder : holderMap.values()) {
+                boolean isChanged = holder.checkAndUpdateHolder();
+                if (isChanged) {
+                    holder.executeCallbacks();
+                }
+            }
+        }
+
+        private boolean checkWithManager(String hostUrl) {
+            HttpGet httpGet = null;
+            try {
+                String url = "http://" + hostUrl + "/api/inlong/manager/openapi/audit/getConfig";
+                LOG.info("start to request {} to get config info", url);
+                httpGet = new HttpGet(url);
+                httpGet.addHeader(HttpHeaders.CONNECTION, "close");
+
+                // request with post
+                CloseableHttpResponse response = httpClient.execute(httpGet);
+                String returnStr = EntityUtils.toString(response.getEntity());
+                // get groupId <-> topic and m value.
+
+                Map<String, String> configJsonMap = gson.fromJson(returnStr, Map.class);
+                if (configJsonMap != null && configJsonMap.size() > 0) {
+                    for (Entry<String, String> entry : configJsonMap.entrySet()) {
+                        Map<String, String> valueMap = gson.fromJson(entry.getValue(), Map.class);
+                        configManager.updatePropertiesHolder(valueMap,
+                                entry.getKey(), true);
+                    }
+                }
+            } catch (Exception ex) {
+                LOG.error("exception caught", ex);
+                return false;
+            } finally {
+                if (httpGet != null) {
+                    httpGet.releaseConnection();
+                }
+            }
+            return true;
+        }
+
+        private void checkRemoteConfig() {
+
+            try {
+                String managerHosts = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get("manager_hosts");
+                String[] hostList = StringUtils.split(managerHosts, ",");
+                for (String host : hostList) {
+                    if (checkWithManager(host)) {
+                        break;
+                    }
+                }
+            } catch (Exception ex) {
+                LOG.error("exception caught", ex);
+            }
+        }
+
+        @Override
+        public void run() {
+            long count = 0;
+            while (isRunning) {
+
+                long sleepTimeInMs = getSleepTime();
+                count += 1;
+                try {
+                    checkLocalFile();
+                    // wait for 30 seconds to update remote config
+                    if (count % 3 == 0) {
+                        checkRemoteConfig();
+                        count = 0;
+                    }
+                    TimeUnit.MILLISECONDS.sleep(sleepTimeInMs);
+                } catch (Exception ex) {
+                    LOG.error("exception caught", ex);
+                }
+            }
+        }
+    }
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
new file mode 100644
index 0000000..9172ff7
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file;
+
+import java.util.List;
+
+public class RemoteConfigJson {
+
+    private boolean result;
+    private List<DataItem> data;
+    private int errCode;
+
+    public List<DataItem> getData() {
+        return data;
+    }
+
+    public int getErrCode() {
+        return errCode;
+    }
+
+    public static class DataItem {
+
+        private String groupId;
+        private String topic;
+        private String m;
+
+        public String getGroupId() {
+            return groupId;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public String getM() {
+            return m;
+        }
+    }
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/ConfigUpdateCallback.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/ConfigUpdateCallback.java
new file mode 100644
index 0000000..cacf3e0
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/ConfigUpdateCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file.holder;
+
+/**
+ * update callback
+ */
+public interface ConfigUpdateCallback {
+
+    /**
+     * update
+     */
+    void update();
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/PropertiesConfigHolder.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/PropertiesConfigHolder.java
new file mode 100644
index 0000000..1afe7c5
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/holder/PropertiesConfigHolder.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.file.holder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
+import org.apache.inlong.audit.file.ConfigHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * properties to map
+ */
+public class PropertiesConfigHolder extends ConfigHolder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PropertiesConfigHolder.class);
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private Map<String, String> holder;
+
+    public PropertiesConfigHolder(String fileName, boolean needToCheckChanged) {
+        super(fileName, needToCheckChanged);
+    }
+
+    @Override
+    public void loadFromFileToHolder() {
+        readWriteLock.readLock().lock();
+        try {
+            Map<String, String> tmpHolder = loadProperties();
+            LOG.info(getFileName() + " load content {}", tmpHolder);
+            holder = tmpHolder;
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * holder
+     *
+     * @return
+     */
+    public Map<String, String> forkHolder() {
+        Map<String, String> tmpHolder = new HashMap<String, String>();
+        if (holder != null) {
+            for (Map.Entry<String, String> entry : holder.entrySet()) {
+                tmpHolder.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return tmpHolder;
+    }
+
+    private List<String> getStringListFromHolder(Map<String, String> tmpHolder) {
+        List<String> result = new ArrayList<String>();
+        for (Map.Entry<String, String> entry : tmpHolder.entrySet()) {
+            result.add(entry.getKey() + "=" + entry.getValue());
+        }
+        return result;
+    }
+
+    /**
+     * load from holder
+     * @param tmpHolder
+     * @return
+     */
+    public boolean loadFromHolderToFile(Map<String, String> tmpHolder) {
+        readWriteLock.writeLock().lock();
+        boolean isSuccess = false;
+        try {
+            File sourceFile = new File(getFilePath());
+            File targetFile = new File(getNextBackupFileName());
+            File tmpNewFile = new File(getFileName() + ".tmp");
+
+            if (sourceFile.exists()) {
+                FileUtils.copyFile(sourceFile, targetFile);
+            }
+
+            List<String> lines = getStringListFromHolder(tmpHolder);
+            FileUtils.writeLines(tmpNewFile, lines);
+
+            FileUtils.copyFile(tmpNewFile, sourceFile);
+            tmpNewFile.delete();
+            isSuccess = true;
+            getFileChanged().set(true);
+        } catch (Exception ex) {
+            LOG.error("error in writing file", ex);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+        return isSuccess;
+    }
+
+    protected Map<String, String> loadProperties() {
+        Map<String, String> result = new HashMap<String, String>();
+        InputStream inStream = null;
+        try {
+            URL url = getClass().getClassLoader().getResource(getFileName());
+            inStream = url != null ? url.openStream() : null;
+
+            if (inStream == null) {
+                LOG.error("InputStream {} is null!", getFileName());
+            }
+            Properties props = new Properties();
+            props.load(inStream);
+            for (Map.Entry<Object, Object> entry : props.entrySet()) {
+                result.put((String) entry.getKey(), (String) entry.getValue());
+            }
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", getFileName(), e);
+        } catch (Exception e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", getFileName(), e);
+        } finally {
+            if (null != inStream) {
+                try {
+                    inStream.close();
+                } catch (IOException e) {
+                    LOG.error("fail to loadTopics, inStream.close ,and e= {}", getFileName(), e);
+                }
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public Map<String, String> getHolder() {
+        return holder;
+    }
+
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/http/StatusCode.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/http/StatusCode.java
new file mode 100644
index 0000000..d45bc41
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/http/StatusCode.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.http;
+
+public class StatusCode {
+
+    public static final int SUCCESS = 1;
+
+    public static final int ILLEGAL_ARGUMENT = -100;
+
+    public static final int EXCEED_LEN = -101;
+
+    public static final int SERVICE_ERR = -105;
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
new file mode 100644
index 0000000..422fc37
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.protocol;
+
+public class AuditData {
+    private String ip;
+    private String dockerId;
+    private String threadId;
+    private long sdkTs;
+    private long packetId;
+    private long logTs;
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String auditId;
+    private long count;
+    private long size;
+    private long delay;
+
+    public String getIp() {
+        return ip;
+    }
+
+    public void setIp(String ip) {
+        this.ip = ip;
+    }
+
+    public String getDockerId() {
+        return dockerId;
+    }
+
+    public void setDockerId(String dockerId) {
+        this.dockerId = dockerId;
+    }
+
+    public String getThreadId() {
+        return threadId;
+    }
+
+    public void setThreadId(String threadId) {
+        this.threadId = threadId;
+    }
+
+    public long getSdkTs() {
+        return sdkTs;
+    }
+
+    public void setSdkTs(long sdkTs) {
+        this.sdkTs = sdkTs;
+    }
+
+    public long getPacketId() {
+        return packetId;
+    }
+
+    public void setPacketId(long packetId) {
+        this.packetId = packetId;
+    }
+
+    public long getLogTs() {
+        return logTs;
+    }
+
+    public void setLogTs(long logTs) {
+        this.logTs = logTs;
+    }
+
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
+    }
+
+    public String getInlongStreamId() {
+        return inlongStreamId;
+    }
+
+    public void setInlongStreamId(String inlongStreamId) {
+        this.inlongStreamId = inlongStreamId;
+    }
+
+    public String getAuditId() {
+        return auditId;
+    }
+
+    public void setAuditId(String auditId) {
+        this.auditId = auditId;
+    }
+
+    public long getCount() {
+        return count;
+    }
+
+    public void setCount(long count) {
+        this.count = count;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    public void setSize(long size) {
+        this.size = size;
+    }
+
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
new file mode 100644
index 0000000..c5f6c05
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.protocol;
+
+import org.apache.inlong.audit.protocol.AuditApi.AuditReply;
+import org.apache.inlong.audit.protocol.AuditApi.AuditRequest;
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type;
+import org.apache.inlong.audit.protocol.AuditApi.Pong;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+public class Commands {
+
+    public static int HEAD_LENGTH = 4;
+
+    public static ChannelBuffer getPongChannelBuffer() {
+        BaseCommand cmdPong = BaseCommand.newBuilder()
+                .setType(Type.PONG)
+                .setPong(Pong.getDefaultInstance()).build();
+        return getChannelBuffer(cmdPong.toByteArray());
+    }
+
+    public static ChannelBuffer getPingChannelBuffer() {
+        BaseCommand cmdPing = BaseCommand.newBuilder()
+                .setType(Type.PING)
+                .setPong(Pong.getDefaultInstance()).build();
+        return getChannelBuffer(cmdPing.toByteArray());
+    }
+
+    public static ChannelBuffer getAuditRequestBuffer(AuditRequest auditRequest) {
+        BaseCommand cmdAuditRequest = BaseCommand.newBuilder()
+                .setType(Type.AUDITREQUEST)
+                .setAuditRequest(auditRequest).build();
+        return getChannelBuffer(cmdAuditRequest.toByteArray());
+    }
+
+    public static ChannelBuffer getAuditReplylBuffer(AuditReply auditReply) {
+        BaseCommand cmdAuditReply = BaseCommand.newBuilder()
+                .setType(Type.AUDITREPLY)
+                .setAuditReply(auditReply).build();
+        return getChannelBuffer(cmdAuditReply.toByteArray());
+    }
+
+    private static ChannelBuffer getChannelBuffer(byte[] body) {
+        /* [totalSize] | [body]*/
+        int totalLength = body.length;
+        ChannelBuffer cmdPingBuffer = ChannelBuffers.buffer(
+                HEAD_LENGTH + totalLength);
+        cmdPingBuffer.writeInt(totalLength);
+        cmdPingBuffer.writeBytes(body);
+        return cmdPingBuffer;
+    }
+}
diff --git a/inlong-audit/audit-common/src/main/proto/AuditApi.proto b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
new file mode 100644
index 0000000..ce98fcd
--- /dev/null
+++ b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.inlong.audit.protocol;
+
+message BaseCommand {
+    enum Type {
+        PING          = 0;
+        PONG          = 1;
+        AUDITREQUEST  = 2;
+        AUDITREPLY    = 3;
+    }
+    Type type                            = 1;
+    AuditRequest audit_request  = 2;
+    AuditReply audit_reply      = 3;
+    Ping ping                   = 4;
+    Pong pong                   = 5;
+}
+
+message Ping {
+}
+
+message Pong {
+}
+
+message AuditRequest {
+  uint64 request_id = 1;
+  AuditMessageHeader msg_header = 2;
+  repeated AuditMessageBody msg_body = 3;
+}
+
+message AuditMessageHeader {
+  string ip = 1;
+  string docker_id = 2;
+  string thread_id = 3;
+  uint64 sdk_ts = 4;
+  uint64 packet_id = 5;
+}
+
+message AuditMessageBody {
+  uint64 log_ts = 1;
+  string inlong_group_id= 2;
+  string inlong_stream_id= 3;
+  string audit_id = 4;
+  uint64 count = 5;
+  uint64 size = 6;
+  int64  delay = 7;
+}
+
+message AuditReply {
+  enum RSP_CODE {
+    SUCCESS  = 0;
+    FAILED   = 1;
+    DISASTER = 2;
+  }
+  uint64 request_id = 1;
+  RSP_CODE rsp_code = 2;
+  string message = 3;
+}