You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/25 07:46:17 UTC

[iotdb] branch iotdb-3227 updated: snapshot for UDFRegistrationService

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-3227
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/iotdb-3227 by this push:
     new d8261859a2 snapshot for UDFRegistrationService
d8261859a2 is described below

commit d8261859a28ff8b38b6647bc5f1dac89a313f3c6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 25 12:13:05 2022 +0800

    snapshot for UDFRegistrationService
---
 .../iotdb/commons/udf/service/SnapshotUtils.java   | 96 ++++++++++++++++++++++
 .../commons/udf/service/UDFExecutableManager.java  | 54 +-----------
 .../udf/service/UDFRegistrationService.java        | 29 +++++--
 3 files changed, 122 insertions(+), 57 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java
new file mode 100644
index 0000000000..9c30095196
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.service;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+public class SnapshotUtils {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(UDFExecutableManager.class);
+
+  public static boolean takeSnapshotForDir(String source, String snapshotDestination)
+      throws IOException {
+    final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+    final File sourceFile = systemFileFactory.getFile(source);
+    final File destinationFile = systemFileFactory.getFile(snapshotDestination);
+    final File temporaryFile =
+        systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
+
+    if (!sourceFile.exists()) {
+      return true;
+    }
+
+    FileUtils.deleteQuietly(temporaryFile);
+    FileUtils.forceMkdir(temporaryFile);
+
+    try {
+      FileUtils.copyDirectory(sourceFile, temporaryFile);
+      FileUtils.deleteQuietly(destinationFile);
+      return temporaryFile.renameTo(destinationFile);
+    } finally {
+      FileUtils.deleteQuietly(temporaryFile);
+    }
+  }
+
+  public static void loadSnapshotForDir(String snapshotSource, String destination)
+      throws IOException {
+    final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+    final File sourceFile = systemFileFactory.getFile(snapshotSource);
+    final File destinationFile = systemFileFactory.getFile(destination);
+    final File temporaryFile =
+        systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
+
+    if (!sourceFile.exists()) {
+      return;
+    }
+
+    try {
+      if (destinationFile.exists()) {
+        FileUtils.deleteQuietly(temporaryFile);
+        FileUtils.moveDirectory(destinationFile, temporaryFile);
+      }
+
+      FileUtils.forceMkdir(destinationFile);
+
+      try {
+        FileUtils.copyDirectory(sourceFile, destinationFile);
+      } catch (Exception e) {
+        LOGGER.error("Failed to load udf snapshot and rollback.");
+        FileUtils.deleteQuietly(destinationFile);
+
+        if (temporaryFile.exists()) {
+          FileUtils.moveDirectory(temporaryFile, destinationFile);
+        }
+      }
+    } finally {
+      FileUtils.deleteQuietly(temporaryFile);
+    }
+  }
+
+  private SnapshotUtils() {}
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
index ea43a16b86..f7f18d7171 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -36,13 +34,10 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class UDFExecutableManager implements IService, SnapshotProcessor {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(UDFExecutableManager.class);
-
   private final String temporaryLibRoot;
   private final String udfLibRoot;
 
@@ -181,62 +176,21 @@ public class UDFExecutableManager implements IService, SnapshotProcessor {
 
   @Override
   public boolean processTakeSnapshot(File snapshotDir) throws IOException {
-    return takeSnapshotForDir(
+    return SnapshotUtils.takeSnapshotForDir(
             temporaryLibRoot,
             snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary")
-        && takeSnapshotForDir(
+        && SnapshotUtils.takeSnapshotForDir(
             udfLibRoot,
             snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf");
   }
 
-  private boolean takeSnapshotForDir(String source, String snapshotDestination) throws IOException {
-    final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
-    final File sourceFile = systemFileFactory.getFile(source);
-    final File destinationFile = systemFileFactory.getFile(snapshotDestination);
-    final File temporaryFile =
-        systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
-
-    FileUtils.deleteQuietly(temporaryFile);
-    FileUtils.forceMkdir(temporaryFile);
-
-    try {
-      FileUtils.copyDirectory(sourceFile, temporaryFile);
-      FileUtils.deleteQuietly(destinationFile);
-      return temporaryFile.renameTo(destinationFile);
-    } finally {
-      FileUtils.deleteQuietly(temporaryFile);
-    }
-  }
-
   @Override
   public void processLoadSnapshot(File snapshotDir) throws IOException {
-    loadSnapshotForDir(
+    SnapshotUtils.loadSnapshotForDir(
         snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary",
         temporaryLibRoot);
-    loadSnapshotForDir(
+    SnapshotUtils.loadSnapshotForDir(
         snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf",
         udfLibRoot);
   }
-
-  private void loadSnapshotForDir(String snapshotSource, String destination) throws IOException {
-    final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
-    final File sourceFile = systemFileFactory.getFile(snapshotSource);
-    final File destinationFile = systemFileFactory.getFile(destination);
-    final File temporaryFile =
-        systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
-
-    try {
-      FileUtils.moveDirectory(destinationFile, temporaryFile);
-      FileUtils.forceMkdir(destinationFile);
-      try {
-        FileUtils.copyDirectory(sourceFile, destinationFile);
-      } catch (Exception e) {
-        LOGGER.error("Failed to load udf snapshot and rollback.");
-        FileUtils.deleteQuietly(destinationFile);
-        FileUtils.moveDirectory(temporaryFile, destinationFile);
-      }
-    } finally {
-      FileUtils.deleteQuietly(temporaryFile);
-    }
-  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index ced510ece9..87a75a86dd 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -59,7 +59,7 @@ public class UDFRegistrationService implements IService, SnapshotProcessor {
   private final String temporaryLogFileName;
 
   private final ReentrantLock registrationLock;
-  private final ConcurrentHashMap<String, UDFRegistrationInformation> registrationInformation;
+  private ConcurrentHashMap<String, UDFRegistrationInformation> registrationInformation;
 
   private final ReentrantReadWriteLock logWriterLock;
   private UDFLogWriter logWriter;
@@ -325,15 +325,20 @@ public class UDFRegistrationService implements IService, SnapshotProcessor {
   @Override
   public void start() throws StartupException {
     try {
-      registerBuiltinTimeSeriesGeneratingFunctions();
-      makeDirIfNecessary();
-      doRecovery();
-      logWriter = new UDFLogWriter(logFileName);
+      recovery();
     } catch (Exception e) {
       throw new StartupException(e);
     }
   }
 
+  private void recovery() throws Exception {
+    registrationInformation = new ConcurrentHashMap<>();
+    registerBuiltinTimeSeriesGeneratingFunctions();
+    makeDirIfNecessary();
+    doRecovery();
+    logWriter = new UDFLogWriter(logFileName);
+  }
+
   private void registerBuiltinTimeSeriesGeneratingFunctions() {
     for (BuiltinTimeSeriesGeneratingFunction builtinTimeSeriesGeneratingFunction :
         BuiltinTimeSeriesGeneratingFunction.values()) {
@@ -480,9 +485,19 @@ public class UDFRegistrationService implements IService, SnapshotProcessor {
 
   @Override
   public boolean processTakeSnapshot(File snapshotDir) throws IOException {
-    return false;
+    return SnapshotUtils.takeSnapshotForDir(
+        ulogFileDir, snapshotDir.getAbsolutePath() + File.separator + "udf");
   }
 
   @Override
-  public void processLoadSnapshot(File snapshotDir) throws IOException {}
+  public void processLoadSnapshot(File snapshotDir) throws IOException {
+    SnapshotUtils.loadSnapshotForDir(
+        snapshotDir.getAbsolutePath() + File.separator + "udf", ulogFileDir);
+
+    try {
+      recovery();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
 }