You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/03/10 02:09:34 UTC

[skywalking] branch vault-support created (now 8ac1e23)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch vault-support
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at 8ac1e23  Add file change detection mechanism

This branch includes the following new commits:

     new 8ac1e23  Add file change detection mechanism

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Add file change detection mechanism

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch vault-support
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 8ac1e236b6efe36beb08146dd41cb56f74b13f47
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Mar 10 10:09:17 2020 +0800

    Add file change detection mechanism
---
 .../core/storage/ttl/DataTTLKeeperTimer.java       |  14 +-
 .../oap/server/library/util/FileChangeMonitor.java | 156 +++++++++++++++++++++
 .../server/library/util/FileChangeMonitorTest.java |  85 +++++++++++
 3 files changed, 248 insertions(+), 7 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index d83049a..7477ca8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
@@ -46,11 +47,10 @@ import org.slf4j.LoggerFactory;
  * override TTL, which could be more suitable for the implementation. No matter which TTL configurations are set, they
  * are all driven by this timer.
  */
+@Slf4j
 public enum DataTTLKeeperTimer {
     INSTANCE;
 
-    private static final Logger logger = LoggerFactory.getLogger(DataTTLKeeperTimer.class);
-
     private ModuleManager moduleManager;
     private ClusterNodesQuery clusterNodesQuery;
 
@@ -62,7 +62,7 @@ public enum DataTTLKeeperTimer {
                  .scheduleAtFixedRate(
                      new RunnableWithExceptionProtection(
                          this::delete,
-                         t -> logger.error("Remove data in background failure.", t)
+                         t -> log.error("Remove data in background failure.", t)
                      ), moduleConfig
                          .getDataKeeperExecutePeriod(), moduleConfig.getDataKeeperExecutePeriod(), TimeUnit.MINUTES);
     }
@@ -74,11 +74,11 @@ public enum DataTTLKeeperTimer {
     private void delete() {
         List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
         if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
-            logger.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
+            log.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
             return;
         }
 
-        logger.info("Beginning to remove expired metrics from the storage.");
+        log.info("Beginning to remove expired metrics from the storage.");
         IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
         List<Model> models = modelGetter.getModels();
         models.forEach(model -> {
@@ -95,8 +95,8 @@ public enum DataTTLKeeperTimer {
                          .getService(IHistoryDeleteDAO.class)
                          .deleteHistory(model, Metrics.TIME_BUCKET);
         } catch (IOException e) {
-            logger.warn("History of {} delete failure", model.getName());
-            logger.error(e.getMessage(), e);
+            log.warn("History of {} delete failure", model.getName());
+            log.error(e.getMessage(), e);
         }
     }
 }
\ No newline at end of file
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitor.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitor.java
new file mode 100644
index 0000000..3e44d41
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * File change monitor is a disk file watcher. It keeps to watch the file `last modified timestamp`, after it changed,
+ * fetch the new content of the file and check with the prev one. If content changed, it will notify the listener.
+ *
+ * File Change
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class FileChangeMonitor {
+    /**
+     * The backend scheduler to trigger all file monitoring.
+     */
+    private static ScheduledFuture<?> FILE_MONITOR_TASK_SCHEDULER;
+    /**
+     * The list contains all monitors.
+     */
+    private static List<FileChangeMonitor> MONITOR_INSTANCES = new ArrayList<>();
+
+    /**
+     * The absolute path of the monitored file.
+     */
+    private final String filePath;
+    /**
+     * Trigger notification when file is not there.
+     */
+    private final boolean acceptFileNotExisting;
+    /**
+     * The period of watching thread checking the file status. Unit is the second.
+     */
+    private final long watchingPeriodInSec;
+    /**
+     * The notifier when file content changed.
+     */
+    private final ContentChangedNotifier notifier;
+    /**
+     * The timestamp when last time do status checked.
+     */
+    private long lastCheckTimestamp = 0;
+    /**
+     * The last modify time of the {@link #filePath}
+     */
+    private long lastModifiedTimestamp = 0;
+
+    /**
+     * Start the file monitor for this instance.
+     */
+    public synchronized void start() {
+        if (FILE_MONITOR_TASK_SCHEDULER == null) {
+            FILE_MONITOR_TASK_SCHEDULER = Executors.newSingleThreadScheduledExecutor()
+                                                   .scheduleAtFixedRate(
+                                                       FileChangeMonitor::run, 1, 1,
+                                                       TimeUnit.SECONDS
+                                                   );
+        }
+
+        this.checkAndNotify();
+        MONITOR_INSTANCES.add(this);
+    }
+
+    public synchronized void stop() {
+        MONITOR_INSTANCES.remove(this);
+    }
+
+    /**
+     * Check the file status, if changed, send the notification.
+     */
+    private void checkAndNotify() {
+        if (System.currentTimeMillis() - lastCheckTimestamp < watchingPeriodInSec * 1000) {
+            // Don't reach the period threshold, ignore this check.
+            return;
+        }
+        File targetFile = new File(filePath);
+        if (!targetFile.exists() && acceptFileNotExisting) {
+            notifier.fileNotFound();
+        }
+        if (targetFile.isFile()) {
+            long lastModified = targetFile.lastModified();
+
+            if (lastModified != lastModifiedTimestamp) {
+                try (FileInputStream fileInputStream = new FileInputStream(targetFile)) {
+                    notifier.newFileContent(fileInputStream);
+                } catch (FileNotFoundException e) {
+                    log.error("The existed file turns to missing, watch file=" + filePath, e);
+                } catch (IOException e) {
+                    log.error("Read file failure, watch file=" + filePath, e);
+                } finally {
+                    lastModifiedTimestamp = lastModified;
+                }
+            }
+        }
+    }
+
+    /**
+     * Check all registered file.
+     */
+    private static void run() {
+        MONITOR_INSTANCES.forEach(monitor -> {
+            try {
+                monitor.checkAndNotify();
+            } catch (Throwable e) {
+                log.error("Error happens during monitoring file=" + monitor.filePath, e);
+            }
+        });
+    }
+
+    /**
+     * The callback when file name changed.
+     */
+    public interface ContentChangedNotifier {
+        /**
+         * Notify the new content by providing the file input stream
+         *
+         * @param readableStream points to the new content
+         * @throws IOException if error happens during reading.
+         */
+        void newFileContent(InputStream readableStream) throws IOException;
+
+        /**
+         * Notify the event of file not found.
+         */
+        void fileNotFound();
+    }
+}
diff --git a/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitorTest.java b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitorTest.java
new file mode 100644
index 0000000..ce4d742
--- /dev/null
+++ b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitorTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FileChangeMonitorTest {
+    private static String FILE_NAME = "FileChangeMonitorTest.tmp";
+
+    @Test
+    public void test() throws InterruptedException, IOException {
+        final boolean[] fileNotFoundDetected = {false};
+        StringBuilder content = new StringBuilder();
+        FileChangeMonitor monitor = new FileChangeMonitor(
+            FILE_NAME, true, 1, new FileChangeMonitor.ContentChangedNotifier() {
+            @Override
+            public void newFileContent(final InputStream readableStream) throws IOException {
+                BufferedInputStream bis = new BufferedInputStream(readableStream);
+                byte[] bytes = new byte[1024];
+                int len = 0;
+                while ((len = bis.read(bytes)) != -1) {
+                    content.append(new String(bytes, 0, len));
+                }
+            }
+
+            @Override
+            public void fileNotFound() {
+                fileNotFoundDetected[0] = true;
+            }
+        });
+
+        monitor.start();
+        Assert.assertTrue(fileNotFoundDetected[0]);
+
+        File file = new File(FILE_NAME);
+        BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file));
+        bos.write("test context".getBytes(Charset.forName("UTF-8")));
+        bos.close();
+
+        int countDown = 10;
+        boolean notified = false;
+        while (countDown-- > 0) {
+            if ("test context".equals(content.toString())) {
+                notified = true;
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        Assert.assertTrue(notified);
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        File file = new File(FILE_NAME);
+        if (file.exists() && file.isFile()) {
+            file.delete();
+        }
+    }
+}