You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/25 07:46:17 UTC
[iotdb] branch iotdb-3227 updated: snapshot for UDFRegistrationService
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-3227
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/iotdb-3227 by this push:
new d8261859a2 snapshot for UDFRegistrationService
d8261859a2 is described below
commit d8261859a28ff8b38b6647bc5f1dac89a313f3c6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 25 12:13:05 2022 +0800
snapshot for UDFRegistrationService
---
.../iotdb/commons/udf/service/SnapshotUtils.java | 96 ++++++++++++++++++++++
.../commons/udf/service/UDFExecutableManager.java | 54 +-----------
.../udf/service/UDFRegistrationService.java | 29 +++++--
3 files changed, 122 insertions(+), 57 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java
new file mode 100644
index 0000000000..9c30095196
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.service;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+public class SnapshotUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UDFExecutableManager.class);
+
+ public static boolean takeSnapshotForDir(String source, String snapshotDestination)
+ throws IOException {
+ final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+ final File sourceFile = systemFileFactory.getFile(source);
+ final File destinationFile = systemFileFactory.getFile(snapshotDestination);
+ final File temporaryFile =
+ systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
+
+ if (!sourceFile.exists()) {
+ return true;
+ }
+
+ FileUtils.deleteQuietly(temporaryFile);
+ FileUtils.forceMkdir(temporaryFile);
+
+ try {
+ FileUtils.copyDirectory(sourceFile, temporaryFile);
+ FileUtils.deleteQuietly(destinationFile);
+ return temporaryFile.renameTo(destinationFile);
+ } finally {
+ FileUtils.deleteQuietly(temporaryFile);
+ }
+ }
+
+ public static void loadSnapshotForDir(String snapshotSource, String destination)
+ throws IOException {
+ final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+ final File sourceFile = systemFileFactory.getFile(snapshotSource);
+ final File destinationFile = systemFileFactory.getFile(destination);
+ final File temporaryFile =
+ systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
+
+ if (!sourceFile.exists()) {
+ return;
+ }
+
+ try {
+ if (destinationFile.exists()) {
+ FileUtils.deleteQuietly(temporaryFile);
+ FileUtils.moveDirectory(destinationFile, temporaryFile);
+ }
+
+ FileUtils.forceMkdir(destinationFile);
+
+ try {
+ FileUtils.copyDirectory(sourceFile, destinationFile);
+ } catch (Exception e) {
+ LOGGER.error("Failed to load udf snapshot and rollback.");
+ FileUtils.deleteQuietly(destinationFile);
+
+ if (temporaryFile.exists()) {
+ FileUtils.moveDirectory(temporaryFile, destinationFile);
+ }
+ }
+ } finally {
+ FileUtils.deleteQuietly(temporaryFile);
+ }
+ }
+
+ private SnapshotUtils() {}
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
index ea43a16b86..f7f18d7171 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -36,13 +34,10 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
public class UDFExecutableManager implements IService, SnapshotProcessor {
- private static final Logger LOGGER = LoggerFactory.getLogger(UDFExecutableManager.class);
-
private final String temporaryLibRoot;
private final String udfLibRoot;
@@ -181,62 +176,21 @@ public class UDFExecutableManager implements IService, SnapshotProcessor {
@Override
public boolean processTakeSnapshot(File snapshotDir) throws IOException {
- return takeSnapshotForDir(
+ return SnapshotUtils.takeSnapshotForDir(
temporaryLibRoot,
snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary")
- && takeSnapshotForDir(
+ && SnapshotUtils.takeSnapshotForDir(
udfLibRoot,
snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf");
}
- private boolean takeSnapshotForDir(String source, String snapshotDestination) throws IOException {
- final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
- final File sourceFile = systemFileFactory.getFile(source);
- final File destinationFile = systemFileFactory.getFile(snapshotDestination);
- final File temporaryFile =
- systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
-
- FileUtils.deleteQuietly(temporaryFile);
- FileUtils.forceMkdir(temporaryFile);
-
- try {
- FileUtils.copyDirectory(sourceFile, temporaryFile);
- FileUtils.deleteQuietly(destinationFile);
- return temporaryFile.renameTo(destinationFile);
- } finally {
- FileUtils.deleteQuietly(temporaryFile);
- }
- }
-
@Override
public void processLoadSnapshot(File snapshotDir) throws IOException {
- loadSnapshotForDir(
+ SnapshotUtils.loadSnapshotForDir(
snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary",
temporaryLibRoot);
- loadSnapshotForDir(
+ SnapshotUtils.loadSnapshotForDir(
snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf",
udfLibRoot);
}
-
- private void loadSnapshotForDir(String snapshotSource, String destination) throws IOException {
- final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
- final File sourceFile = systemFileFactory.getFile(snapshotSource);
- final File destinationFile = systemFileFactory.getFile(destination);
- final File temporaryFile =
- systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
-
- try {
- FileUtils.moveDirectory(destinationFile, temporaryFile);
- FileUtils.forceMkdir(destinationFile);
- try {
- FileUtils.copyDirectory(sourceFile, destinationFile);
- } catch (Exception e) {
- LOGGER.error("Failed to load udf snapshot and rollback.");
- FileUtils.deleteQuietly(destinationFile);
- FileUtils.moveDirectory(temporaryFile, destinationFile);
- }
- } finally {
- FileUtils.deleteQuietly(temporaryFile);
- }
- }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index ced510ece9..87a75a86dd 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -59,7 +59,7 @@ public class UDFRegistrationService implements IService, SnapshotProcessor {
private final String temporaryLogFileName;
private final ReentrantLock registrationLock;
- private final ConcurrentHashMap<String, UDFRegistrationInformation> registrationInformation;
+ private ConcurrentHashMap<String, UDFRegistrationInformation> registrationInformation;
private final ReentrantReadWriteLock logWriterLock;
private UDFLogWriter logWriter;
@@ -325,15 +325,20 @@ public class UDFRegistrationService implements IService, SnapshotProcessor {
@Override
public void start() throws StartupException {
try {
- registerBuiltinTimeSeriesGeneratingFunctions();
- makeDirIfNecessary();
- doRecovery();
- logWriter = new UDFLogWriter(logFileName);
+ recovery();
} catch (Exception e) {
throw new StartupException(e);
}
}
+ private void recovery() throws Exception {
+ registrationInformation = new ConcurrentHashMap<>();
+ registerBuiltinTimeSeriesGeneratingFunctions();
+ makeDirIfNecessary();
+ doRecovery();
+ logWriter = new UDFLogWriter(logFileName);
+ }
+
private void registerBuiltinTimeSeriesGeneratingFunctions() {
for (BuiltinTimeSeriesGeneratingFunction builtinTimeSeriesGeneratingFunction :
BuiltinTimeSeriesGeneratingFunction.values()) {
@@ -480,9 +485,19 @@ public class UDFRegistrationService implements IService, SnapshotProcessor {
@Override
public boolean processTakeSnapshot(File snapshotDir) throws IOException {
- return false;
+ return SnapshotUtils.takeSnapshotForDir(
+ ulogFileDir, snapshotDir.getAbsolutePath() + File.separator + "udf");
}
@Override
- public void processLoadSnapshot(File snapshotDir) throws IOException {}
+ public void processLoadSnapshot(File snapshotDir) throws IOException {
+ SnapshotUtils.loadSnapshotForDir(
+ snapshotDir.getAbsolutePath() + File.separator + "udf", ulogFileDir);
+
+ try {
+ recovery();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
}