You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/03/10 02:09:35 UTC

[skywalking] 01/01: Add file change detection mechanism

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch vault-support
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 8ac1e236b6efe36beb08146dd41cb56f74b13f47
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Mar 10 10:09:17 2020 +0800

    Add file change detection mechanism
---
 .../core/storage/ttl/DataTTLKeeperTimer.java       |  14 +-
 .../oap/server/library/util/FileChangeMonitor.java | 156 +++++++++++++++++++++
 .../server/library/util/FileChangeMonitorTest.java |  85 +++++++++++
 3 files changed, 248 insertions(+), 7 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index d83049a..7477ca8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
@@ -46,11 +47,10 @@ import org.slf4j.LoggerFactory;
  * override TTL, which could be more suitable for the implementation. No matter which TTL configurations are set, they
  * are all driven by this timer.
  */
+@Slf4j
 public enum DataTTLKeeperTimer {
     INSTANCE;
 
-    private static final Logger logger = LoggerFactory.getLogger(DataTTLKeeperTimer.class);
-
     private ModuleManager moduleManager;
     private ClusterNodesQuery clusterNodesQuery;
 
@@ -62,7 +62,7 @@ public enum DataTTLKeeperTimer {
                  .scheduleAtFixedRate(
                      new RunnableWithExceptionProtection(
                          this::delete,
-                         t -> logger.error("Remove data in background failure.", t)
+                         t -> log.error("Remove data in background failure.", t)
                      ), moduleConfig
                          .getDataKeeperExecutePeriod(), moduleConfig.getDataKeeperExecutePeriod(), TimeUnit.MINUTES);
     }
@@ -74,11 +74,11 @@ public enum DataTTLKeeperTimer {
     private void delete() {
         List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
         if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
-            logger.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
+            log.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
             return;
         }
 
-        logger.info("Beginning to remove expired metrics from the storage.");
+        log.info("Beginning to remove expired metrics from the storage.");
         IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
         List<Model> models = modelGetter.getModels();
         models.forEach(model -> {
@@ -95,8 +95,8 @@ public enum DataTTLKeeperTimer {
                          .getService(IHistoryDeleteDAO.class)
                          .deleteHistory(model, Metrics.TIME_BUCKET);
         } catch (IOException e) {
-            logger.warn("History of {} delete failure", model.getName());
-            logger.error(e.getMessage(), e);
+            log.warn("History of {} delete failure", model.getName());
+            log.error(e.getMessage(), e);
         }
     }
 }
\ No newline at end of file
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitor.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitor.java
new file mode 100644
index 0000000..3e44d41
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * File change monitor is a disk file watcher. It keeps to watch the file `last modified timestamp`, after it changed,
+ * fetch the new content of the file and check with the prev one. If content changed, it will notify the listener.
+ *
+ * File Change
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class FileChangeMonitor {
+    /**
+     * The backend scheduler to trigger all file monitoring.
+     */
+    private static ScheduledFuture<?> FILE_MONITOR_TASK_SCHEDULER;
+    /**
+     * The list contains all monitors.
+     */
+    private static List<FileChangeMonitor> MONITOR_INSTANCES = new ArrayList<>();
+
+    /**
+     * The absolute path of the monitored file.
+     */
+    private final String filePath;
+    /**
+     * Trigger notification when file is not there.
+     */
+    private final boolean acceptFileNotExisting;
+    /**
+     * The period of watching thread checking the file status. Unit is the second.
+     */
+    private final long watchingPeriodInSec;
+    /**
+     * The notifier when file content changed.
+     */
+    private final ContentChangedNotifier notifier;
+    /**
+     * The timestamp when last time do status checked.
+     */
+    private long lastCheckTimestamp = 0;
+    /**
+     * The last modify time of the {@link #filePath}
+     */
+    private long lastModifiedTimestamp = 0;
+
+    /**
+     * Start the file monitor for this instance.
+     */
+    public synchronized void start() {
+        if (FILE_MONITOR_TASK_SCHEDULER == null) {
+            FILE_MONITOR_TASK_SCHEDULER = Executors.newSingleThreadScheduledExecutor()
+                                                   .scheduleAtFixedRate(
+                                                       FileChangeMonitor::run, 1, 1,
+                                                       TimeUnit.SECONDS
+                                                   );
+        }
+
+        this.checkAndNotify();
+        MONITOR_INSTANCES.add(this);
+    }
+
+    public synchronized void stop() {
+        MONITOR_INSTANCES.remove(this);
+    }
+
+    /**
+     * Check the file status, if changed, send the notification.
+     */
+    private void checkAndNotify() {
+        if (System.currentTimeMillis() - lastCheckTimestamp < watchingPeriodInSec * 1000) {
+            // Don't reach the period threshold, ignore this check.
+            return;
+        }
+        File targetFile = new File(filePath);
+        if (!targetFile.exists() && acceptFileNotExisting) {
+            notifier.fileNotFound();
+        }
+        if (targetFile.isFile()) {
+            long lastModified = targetFile.lastModified();
+
+            if (lastModified != lastModifiedTimestamp) {
+                try (FileInputStream fileInputStream = new FileInputStream(targetFile)) {
+                    notifier.newFileContent(fileInputStream);
+                } catch (FileNotFoundException e) {
+                    log.error("The existed file turns to missing, watch file=" + filePath, e);
+                } catch (IOException e) {
+                    log.error("Read file failure, watch file=" + filePath, e);
+                } finally {
+                    lastModifiedTimestamp = lastModified;
+                }
+            }
+        }
+    }
+
+    /**
+     * Check all registered file.
+     */
+    private static void run() {
+        MONITOR_INSTANCES.forEach(monitor -> {
+            try {
+                monitor.checkAndNotify();
+            } catch (Throwable e) {
+                log.error("Error happens during monitoring file=" + monitor.filePath, e);
+            }
+        });
+    }
+
+    /**
+     * The callback when file name changed.
+     */
+    public interface ContentChangedNotifier {
+        /**
+         * Notify the new content by providing the file input stream
+         *
+         * @param readableStream points to the new content
+         * @throws IOException if error happens during reading.
+         */
+        void newFileContent(InputStream readableStream) throws IOException;
+
+        /**
+         * Notify the event of file not found.
+         */
+        void fileNotFound();
+    }
+}
diff --git a/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitorTest.java b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitorTest.java
new file mode 100644
index 0000000..ce4d742
--- /dev/null
+++ b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitorTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FileChangeMonitorTest {
+    private static String FILE_NAME = "FileChangeMonitorTest.tmp";
+
+    @Test
+    public void test() throws InterruptedException, IOException {
+        final boolean[] fileNotFoundDetected = {false};
+        StringBuilder content = new StringBuilder();
+        FileChangeMonitor monitor = new FileChangeMonitor(
+            FILE_NAME, true, 1, new FileChangeMonitor.ContentChangedNotifier() {
+            @Override
+            public void newFileContent(final InputStream readableStream) throws IOException {
+                BufferedInputStream bis = new BufferedInputStream(readableStream);
+                byte[] bytes = new byte[1024];
+                int len = 0;
+                while ((len = bis.read(bytes)) != -1) {
+                    content.append(new String(bytes, 0, len));
+                }
+            }
+
+            @Override
+            public void fileNotFound() {
+                fileNotFoundDetected[0] = true;
+            }
+        });
+
+        monitor.start();
+        Assert.assertTrue(fileNotFoundDetected[0]);
+
+        File file = new File(FILE_NAME);
+        BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file));
+        bos.write("test context".getBytes(Charset.forName("UTF-8")));
+        bos.close();
+
+        int countDown = 10;
+        boolean notified = false;
+        while (countDown-- > 0) {
+            if ("test context".equals(content.toString())) {
+                notified = true;
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        Assert.assertTrue(notified);
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        File file = new File(FILE_NAME);
+        if (file.exists() && file.isFile()) {
+            file.delete();
+        }
+    }
+}