You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/03/10 02:09:35 UTC
[skywalking] 01/01: Add file change detection mechanism
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch vault-support
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 8ac1e236b6efe36beb08146dd41cb56f74b13f47
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Mar 10 10:09:17 2020 +0800
Add file change detection mechanism
---
.../core/storage/ttl/DataTTLKeeperTimer.java | 14 +-
.../oap/server/library/util/FileChangeMonitor.java | 156 +++++++++++++++++++++
.../server/library/util/FileChangeMonitorTest.java | 85 +++++++++++
3 files changed, 248 insertions(+), 7 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index d83049a..7477ca8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
@@ -46,11 +47,10 @@ import org.slf4j.LoggerFactory;
* override TTL, which could be more suitable for the implementation. No matter which TTL configurations are set, they
* are all driven by this timer.
*/
+@Slf4j
public enum DataTTLKeeperTimer {
INSTANCE;
- private static final Logger logger = LoggerFactory.getLogger(DataTTLKeeperTimer.class);
-
private ModuleManager moduleManager;
private ClusterNodesQuery clusterNodesQuery;
@@ -62,7 +62,7 @@ public enum DataTTLKeeperTimer {
.scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this::delete,
- t -> logger.error("Remove data in background failure.", t)
+ t -> log.error("Remove data in background failure.", t)
), moduleConfig
.getDataKeeperExecutePeriod(), moduleConfig.getDataKeeperExecutePeriod(), TimeUnit.MINUTES);
}
@@ -74,11 +74,11 @@ public enum DataTTLKeeperTimer {
private void delete() {
List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
- logger.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
+ log.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
return;
}
- logger.info("Beginning to remove expired metrics from the storage.");
+ log.info("Beginning to remove expired metrics from the storage.");
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
List<Model> models = modelGetter.getModels();
models.forEach(model -> {
@@ -95,8 +95,8 @@ public enum DataTTLKeeperTimer {
.getService(IHistoryDeleteDAO.class)
.deleteHistory(model, Metrics.TIME_BUCKET);
} catch (IOException e) {
- logger.warn("History of {} delete failure", model.getName());
- logger.error(e.getMessage(), e);
+ log.warn("History of {} delete failure", model.getName());
+ log.error(e.getMessage(), e);
}
}
}
\ No newline at end of file
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitor.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitor.java
new file mode 100644
index 0000000..3e44d41
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * File change monitor is a disk file watcher. It keeps to watch the file `last modified timestamp`, after it changed,
+ * fetch the new content of the file and check with the prev one. If content changed, it will notify the listener.
+ *
+ * File Change
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class FileChangeMonitor {
+ /**
+ * The backend scheduler to trigger all file monitoring.
+ */
+ private static ScheduledFuture<?> FILE_MONITOR_TASK_SCHEDULER;
+ /**
+ * The list contains all monitors.
+ */
+ private static List<FileChangeMonitor> MONITOR_INSTANCES = new ArrayList<>();
+
+ /**
+ * The absolute path of the monitored file.
+ */
+ private final String filePath;
+ /**
+ * Trigger notification when file is not there.
+ */
+ private final boolean acceptFileNotExisting;
+ /**
+ * The period of watching thread checking the file status. Unit is the second.
+ */
+ private final long watchingPeriodInSec;
+ /**
+ * The notifier when file content changed.
+ */
+ private final ContentChangedNotifier notifier;
+ /**
+ * The timestamp when last time do status checked.
+ */
+ private long lastCheckTimestamp = 0;
+ /**
+ * The last modify time of the {@link #filePath}
+ */
+ private long lastModifiedTimestamp = 0;
+
+ /**
+ * Start the file monitor for this instance.
+ */
+ public synchronized void start() {
+ if (FILE_MONITOR_TASK_SCHEDULER == null) {
+ FILE_MONITOR_TASK_SCHEDULER = Executors.newSingleThreadScheduledExecutor()
+ .scheduleAtFixedRate(
+ FileChangeMonitor::run, 1, 1,
+ TimeUnit.SECONDS
+ );
+ }
+
+ this.checkAndNotify();
+ MONITOR_INSTANCES.add(this);
+ }
+
+ public synchronized void stop() {
+ MONITOR_INSTANCES.remove(this);
+ }
+
+ /**
+ * Check the file status, if changed, send the notification.
+ */
+ private void checkAndNotify() {
+ if (System.currentTimeMillis() - lastCheckTimestamp < watchingPeriodInSec * 1000) {
+ // Don't reach the period threshold, ignore this check.
+ return;
+ }
+ File targetFile = new File(filePath);
+ if (!targetFile.exists() && acceptFileNotExisting) {
+ notifier.fileNotFound();
+ }
+ if (targetFile.isFile()) {
+ long lastModified = targetFile.lastModified();
+
+ if (lastModified != lastModifiedTimestamp) {
+ try (FileInputStream fileInputStream = new FileInputStream(targetFile)) {
+ notifier.newFileContent(fileInputStream);
+ } catch (FileNotFoundException e) {
+ log.error("The existed file turns to missing, watch file=" + filePath, e);
+ } catch (IOException e) {
+ log.error("Read file failure, watch file=" + filePath, e);
+ } finally {
+ lastModifiedTimestamp = lastModified;
+ }
+ }
+ }
+ }
+
+ /**
+ * Check all registered file.
+ */
+ private static void run() {
+ MONITOR_INSTANCES.forEach(monitor -> {
+ try {
+ monitor.checkAndNotify();
+ } catch (Throwable e) {
+ log.error("Error happens during monitoring file=" + monitor.filePath, e);
+ }
+ });
+ }
+
+ /**
+ * The callback when file name changed.
+ */
+ public interface ContentChangedNotifier {
+ /**
+ * Notify the new content by providing the file input stream
+ *
+ * @param readableStream points to the new content
+ * @throws IOException if error happens during reading.
+ */
+ void newFileContent(InputStream readableStream) throws IOException;
+
+ /**
+ * Notify the event of file not found.
+ */
+ void fileNotFound();
+ }
+}
diff --git a/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitorTest.java b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitorTest.java
new file mode 100644
index 0000000..ce4d742
--- /dev/null
+++ b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/FileChangeMonitorTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FileChangeMonitorTest {
+ private static String FILE_NAME = "FileChangeMonitorTest.tmp";
+
+ @Test
+ public void test() throws InterruptedException, IOException {
+ final boolean[] fileNotFoundDetected = {false};
+ StringBuilder content = new StringBuilder();
+ FileChangeMonitor monitor = new FileChangeMonitor(
+ FILE_NAME, true, 1, new FileChangeMonitor.ContentChangedNotifier() {
+ @Override
+ public void newFileContent(final InputStream readableStream) throws IOException {
+ BufferedInputStream bis = new BufferedInputStream(readableStream);
+ byte[] bytes = new byte[1024];
+ int len = 0;
+ while ((len = bis.read(bytes)) != -1) {
+ content.append(new String(bytes, 0, len));
+ }
+ }
+
+ @Override
+ public void fileNotFound() {
+ fileNotFoundDetected[0] = true;
+ }
+ });
+
+ monitor.start();
+ Assert.assertTrue(fileNotFoundDetected[0]);
+
+ File file = new File(FILE_NAME);
+ BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file));
+ bos.write("test context".getBytes(Charset.forName("UTF-8")));
+ bos.close();
+
+ int countDown = 10;
+ boolean notified = false;
+ while (countDown-- > 0) {
+ if ("test context".equals(content.toString())) {
+ notified = true;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertTrue(notified);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ File file = new File(FILE_NAME);
+ if (file.exists() && file.isFile()) {
+ file.delete();
+ }
+ }
+}