You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/29 14:03:18 UTC

[zeppelin] branch master updated: [ZEPPELIN-4907]. Add LocalRecoveryStorage

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9042d3c  [ZEPPELIN-4907]. Add LocalRecoveryStorage
9042d3c is described below

commit 9042d3c9ab43e28002e947c0f1ad0d67207a2730
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Jun 24 13:03:38 2020 +0800

    [ZEPPELIN-4907]. Add LocalRecoveryStorage
    
    ### What is this PR for?
    
    This PR add LocalRecoveryStorage based on java native local file system. Without this PR, user can only enable recovery by using FileSystemRecoveryStorage which depends on hadoop. This implementation doesn't depends on hadoop, user can enable recovery just by using this RecoveryStorage.
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4907
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3822 from zjffdu/ZEPPELIN-4907 and squashes the following commits:
    
    9d10a10ac [Jeff Zhang] [ZEPPELIN-4907]. Add LocalRecoveryStorage
---
 conf/zeppelin-site.xml.template                    |  8 +-
 .../recovery/FileSystemRecoveryStorage.java        | 40 ++-------
 .../interpreter/recovery/LocalRecoveryStorage.java | 97 ++++++++++++++++++++++
 .../interpreter/recovery/RecoveryUtils.java        | 97 ++++++++++++++++++++++
 .../zeppelin/storage/LocalConfigStorage.java       | 65 ++-------------
 .../java/org/apache/zeppelin/util/FileUtils.java   | 74 +++++++++++++++++
 .../recovery/FileSystemRecoveryStorageTest.java    | 21 ++++-
 ...rageTest.java => LocalRecoveryStorageTest.java} | 45 ++++++----
 .../zeppelin/storage/LocalConfigStorageTest.java   |  7 +-
 9 files changed, 340 insertions(+), 114 deletions(-)

diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index a40b9ec..be99433 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -648,8 +648,14 @@
 <!--
 <property>
   <name>zeppelin.recovery.storage.class</name>
+  <value>org.apache.zeppelin.interpreter.recovery.LocalRecoveryStorage</value>
+  <description>ReoveryStorage implementation based on java native local file system</description>
+</property>
+
+<property>
+  <name>zeppelin.recovery.storage.class</name>
   <value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value>
-  <description>ReoveryStorage implementation</description>
+  <description>ReoveryStorage implementation based on hadoop FileSystem</description>
 </property>
 -->
 
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
index 46ac23a..d43d9f9 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
@@ -17,21 +17,16 @@
 
 package org.apache.zeppelin.interpreter.recovery;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
 import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
 import org.apache.zeppelin.notebook.FileSystemStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -79,20 +74,10 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
   private void save(String interpreterSettingName) throws IOException {
     InterpreterSetting interpreterSetting =
         interpreterSettingManager.getInterpreterSettingByName(interpreterSettingName);
-    List<String> recoveryContent = new ArrayList<>();
-    if (interpreterSetting != null) {
-      for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) {
-        RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess();
-        if (interpreterProcess != null && interpreterProcess.isRunning()) {
-          recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" +
-                  interpreterProcess.getPort());
-        }
-      }
-    }
-    String recoveryContentStr = StringUtils.join(recoveryContent, System.lineSeparator());
-    LOGGER.debug("Updating recovery data of {}: {}", interpreterSettingName, recoveryContentStr);
+    String recoveryData = RecoveryUtils.getRecoveryData(interpreterSetting);
+    LOGGER.debug("Updating recovery data of {}: {}", interpreterSettingName, recoveryData);
     Path recoveryFile = new Path(recoveryDir, interpreterSettingName + ".recovery");
-    fs.writeFile(recoveryContentStr, recoveryFile, true);
+    fs.writeFile(recoveryData, recoveryFile, true);
   }
 
   @Override
@@ -105,23 +90,8 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
       String interpreterSettingName = fileName.substring(0,
           fileName.length() - ".recovery".length());
       String recoveryContent = fs.readFile(path);
-      if (!StringUtils.isBlank(recoveryContent)) {
-        for (String line : recoveryContent.split(System.lineSeparator())) {
-          String[] tokens = line.split("\t");
-          String interpreterGroupId = tokens[0];
-          String[] hostPort = tokens[1].split(":");
-          int connectTimeout =
-                  zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
-          RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess(
-              interpreterSettingName, interpreterGroupId, connectTimeout,
-                  interpreterSettingManager.getInterpreterEventServer().getHost(),
-                  interpreterSettingManager.getInterpreterEventServer().getPort(),
-                  hostPort[0], Integer.parseInt(hostPort[1]), true);
-          clients.put(interpreterGroupId, client);
-          LOGGER.info("Recovering Interpreter Process: " + interpreterGroupId + ", " +
-                  hostPort[0] + ":" + hostPort[1]);
-        }
-      }
+      clients.putAll(RecoveryUtils.restoreFromRecoveryData(
+              recoveryContent, interpreterSettingName, interpreterSettingManager, zConf));
     }
 
     return clients;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorage.java
new file mode 100644
index 0000000..078722e
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorage.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter.recovery;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * RecoveryStorage implementation based on java native local file system.
+ */
+public class LocalRecoveryStorage extends RecoveryStorage {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(LocalRecoveryStorage.class);
+
+  private InterpreterSettingManager interpreterSettingManager;
+  private File recoveryDir;
+
+  public LocalRecoveryStorage(ZeppelinConfiguration zConf) {
+    super(zConf);
+  }
+
+  public LocalRecoveryStorage(ZeppelinConfiguration zConf,
+                              InterpreterSettingManager interpreterSettingManager)
+          throws IOException {
+    super(zConf);
+    this.recoveryDir = new File(zConf.getRecoveryDir());
+    LOGGER.info("Using folder {} to store recovery data", recoveryDir);
+    if (!this.recoveryDir.exists()) {
+      FileUtils.forceMkdir(this.recoveryDir);
+    }
+    if (!this.recoveryDir.isDirectory()) {
+      throw new IOException("Recovery dir " + this.recoveryDir.getAbsolutePath() + " is not a directory");
+    }
+    this.interpreterSettingManager = interpreterSettingManager;
+  }
+
+  @Override
+  public void onInterpreterClientStart(InterpreterClient client) throws IOException {
+    save(client.getInterpreterSettingName());
+  }
+
+  @Override
+  public void onInterpreterClientStop(InterpreterClient client) throws IOException {
+    save(client.getInterpreterSettingName());
+  }
+
+  @Override
+  public Map<String, InterpreterClient> restore() throws IOException {
+    Map<String, InterpreterClient> clients = new HashMap<>();
+    File[] recoveryFiles = recoveryDir.listFiles(file -> file.getName().endsWith(".recovery"));
+    for (File recoveryFile : recoveryFiles) {
+      String fileName = recoveryFile.getName();
+      String interpreterSettingName = fileName.substring(0,
+              fileName.length() - ".recovery".length());
+      String recoveryData = org.apache.zeppelin.util.FileUtils.readFromFile(recoveryFile);
+      clients.putAll(RecoveryUtils.restoreFromRecoveryData(
+              recoveryData, interpreterSettingName, interpreterSettingManager, zConf));
+    }
+
+    return clients;
+  }
+
+  private void save(String interpreterSettingName) throws IOException {
+    InterpreterSetting interpreterSetting =
+            interpreterSettingManager.getInterpreterSettingByName(interpreterSettingName);
+    String recoveryData = RecoveryUtils.getRecoveryData(interpreterSetting);
+    LOGGER.debug("Updating recovery data of {}: {}", interpreterSettingName, recoveryData);
+    File recoveryFile = new File(recoveryDir, interpreterSettingName + ".recovery");
+    org.apache.zeppelin.util.FileUtils.atomicWriteToFile(recoveryData, recoveryFile);
+  }
+}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java
new file mode 100644
index 0000000..83db705
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter.recovery;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecoveryUtils {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RecoveryUtils.class);
+
+  /**
+   * Get the recovery data of this interpreterSetting.
+   * It contains all the metadata of running interpreter processes under this interpreterSetting.
+   *
+   * @param interpreterSetting
+   * @return
+   */
+  public static String getRecoveryData(InterpreterSetting interpreterSetting) {
+    List<String> recoveryData = new ArrayList<>();
+    if (interpreterSetting != null) {
+      for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) {
+        RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess();
+        if (interpreterProcess != null && interpreterProcess.isRunning()) {
+          recoveryData.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" +
+                  interpreterProcess.getPort());
+        }
+      }
+    }
+    return StringUtils.join(recoveryData, System.lineSeparator());
+  }
+
+  /**
+   * Return interpreterClient from recoveryData of one interpreterSetting.
+   *
+   * @param recoveryData
+   * @param interpreterSettingName
+   * @param interpreterSettingManager
+   * @param zConf
+   * @return
+   */
+  public static Map<String, InterpreterClient> restoreFromRecoveryData(String recoveryData,
+                                                                       String interpreterSettingName,
+                                                                       InterpreterSettingManager interpreterSettingManager,
+                                                                       ZeppelinConfiguration zConf) {
+
+    Map<String, InterpreterClient> clients = new HashMap<>();
+    if (!StringUtils.isBlank(recoveryData)) {
+      for (String line : recoveryData.split(System.lineSeparator())) {
+        String[] tokens = line.split("\t");
+        String interpreterGroupId = tokens[0];
+        String[] hostPort = tokens[1].split(":");
+        int connectTimeout =
+                zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+        RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess(
+                interpreterSettingName, interpreterGroupId, connectTimeout,
+                interpreterSettingManager.getInterpreterEventServer().getHost(),
+                interpreterSettingManager.getInterpreterEventServer().getPort(),
+                hostPort[0], Integer.parseInt(hostPort[1]), true);
+        clients.put(interpreterGroupId, client);
+        LOGGER.info("Recovering Interpreter Process: " + interpreterGroupId + ", " +
+                hostPort[0] + ":" + hostPort[1]);
+      }
+    }
+
+    return clients;
+  }
+}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java
index 1f2eb3c..d7c3f35 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java
@@ -17,23 +17,15 @@
 
 package org.apache.zeppelin.storage;
 
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.io.IOUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterInfoSaving;
 import org.apache.zeppelin.notebook.NotebookAuthorizationInfoSaving;
+import org.apache.zeppelin.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.FileSystems;
-import java.nio.file.FileSystem;
-import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.EnumSet;
 import java.util.Set;
@@ -59,7 +51,7 @@ public class LocalConfigStorage extends ConfigStorage {
   @Override
   public void save(InterpreterInfoSaving settingInfos) throws IOException {
     LOGGER.info("Save Interpreter Setting to {}", interpreterSettingPath.getAbsolutePath());
-    atomicWriteToFile(settingInfos.toJson(), interpreterSettingPath);
+    FileUtils.atomicWriteToFile(settingInfos.toJson(), interpreterSettingPath);
   }
 
   @Override
@@ -69,14 +61,14 @@ public class LocalConfigStorage extends ConfigStorage {
       return null;
     }
     LOGGER.info("Load Interpreter Setting from file: {}", interpreterSettingPath);
-    String json = readFromFile(interpreterSettingPath);
+    String json = FileUtils.readFromFile(interpreterSettingPath);
     return buildInterpreterInfoSaving(json);
   }
 
   @Override
   public void save(NotebookAuthorizationInfoSaving authorizationInfoSaving) throws IOException {
     LOGGER.info("Save notebook authorization to file: {}", authorizationPath);
-    atomicWriteToFile(authorizationInfoSaving.toJson(), authorizationPath);
+    FileUtils.atomicWriteToFile(authorizationInfoSaving.toJson(), authorizationPath);
   }
 
   @Override
@@ -86,7 +78,7 @@ public class LocalConfigStorage extends ConfigStorage {
       return null;
     }
     LOGGER.info("Load notebook authorization from file: {}", authorizationPath);
-    String json = readFromFile(authorizationPath);
+    String json = FileUtils.readFromFile(authorizationPath);
     return NotebookAuthorizationInfoSaving.fromJson(json);
   }
 
@@ -97,56 +89,13 @@ public class LocalConfigStorage extends ConfigStorage {
       return null;
     }
     LOGGER.info("Load Credential from file: {}", credentialPath);
-    return readFromFile(credentialPath);
+    return FileUtils.readFromFile(credentialPath);
   }
 
   @Override
   public void saveCredentials(String credentials) throws IOException {
     LOGGER.info("Save Credentials to file: {}", credentialPath);
     Set<PosixFilePermission> permissions = EnumSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
-    atomicWriteToFile(credentials, credentialPath, permissions);
+    FileUtils.atomicWriteToFile(credentials, credentialPath, permissions);
   }
-
-  @VisibleForTesting
-  static String readFromFile(File file) throws IOException {
-    try (FileInputStream is = new FileInputStream(file)) {
-      return IOUtils.toString(is);
-    }
-  }
-
-  @VisibleForTesting
-  static void atomicWriteToFile(String content, File file, Set<PosixFilePermission> permissions) throws IOException {
-    FileSystem defaultFileSystem = FileSystems.getDefault();
-    Path destinationFilePath = defaultFileSystem.getPath(file.getCanonicalPath());
-    Path destinationDirectory = destinationFilePath.getParent();
-    Files.createDirectories(destinationDirectory);
-    File tempFile = Files.createTempFile(destinationDirectory, file.getName(), null).toFile();
-    if (permissions != null && !permissions.isEmpty()) {
-      Files.setPosixFilePermissions(tempFile.toPath(), permissions);
-    }
-    try (FileOutputStream out = new FileOutputStream(tempFile)) {
-      IOUtils.write(content, out);
-    } catch (IOException iox) {
-      if (!tempFile.delete()) {
-        tempFile.deleteOnExit();
-      }
-      throw iox;
-    }
-    try {
-      file.getParentFile().mkdirs();
-      Files.move(tempFile.toPath(), destinationFilePath,
-              StandardCopyOption.REPLACE_EXISTING); //StandardCopyOption.ATOMIC_MOVE);
-    } catch (IOException iox) {
-      if (!tempFile.delete()) {
-        tempFile.deleteOnExit();
-      }
-      throw iox;
-    }
-  }
-
-  @VisibleForTesting
-  static void atomicWriteToFile(String content, File file) throws IOException {
-    atomicWriteToFile(content, file, null);
-  }
-
 }
\ No newline at end of file
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/FileUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/FileUtils.java
new file mode 100644
index 0000000..1248d7e
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/FileUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.Set;
+
+public class FileUtils {
+
+  public static void atomicWriteToFile(String content, File file, Set<PosixFilePermission> permissions) throws IOException {
+    FileSystem defaultFileSystem = FileSystems.getDefault();
+    Path destinationFilePath = defaultFileSystem.getPath(file.getCanonicalPath());
+    Path destinationDirectory = destinationFilePath.getParent();
+    Files.createDirectories(destinationDirectory);
+    File tempFile = Files.createTempFile(destinationDirectory, file.getName(), null).toFile();
+    if (permissions != null && !permissions.isEmpty()) {
+      Files.setPosixFilePermissions(tempFile.toPath(), permissions);
+    }
+    try (FileOutputStream out = new FileOutputStream(tempFile)) {
+      IOUtils.write(content, out);
+    } catch (IOException iox) {
+      if (!tempFile.delete()) {
+        tempFile.deleteOnExit();
+      }
+      throw iox;
+    }
+    try {
+      file.getParentFile().mkdirs();
+      Files.move(tempFile.toPath(), destinationFilePath,
+              StandardCopyOption.REPLACE_EXISTING); //StandardCopyOption.ATOMIC_MOVE);
+    } catch (IOException iox) {
+      if (!tempFile.delete()) {
+        tempFile.deleteOnExit();
+      }
+      throw iox;
+    }
+  }
+
+  public static void atomicWriteToFile(String content, File file) throws IOException {
+    atomicWriteToFile(content, file, null);
+  }
+
+  public static String readFromFile(File file) throws IOException {
+    try (FileInputStream is = new FileInputStream(file)) {
+      return IOUtils.toString(is);
+    }
+  }
+}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java
index 6118545..924b182 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
 package org.apache.zeppelin.interpreter.recovery;
 
 import com.google.common.io.Files;
@@ -36,8 +54,7 @@ public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
   public void tearDown() throws Exception {
     super.tearDown();
     FileUtils.deleteDirectory(recoveryDir);
-    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
-            ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getStringValue());
+    System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName());
   }
 
   @Test
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorageTest.java
similarity index 71%
copy from zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java
copy to zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorageTest.java
index 6118545..a6d6e2e 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorageTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
 package org.apache.zeppelin.interpreter.recovery;
 
 import com.google.common.io.Files;
@@ -19,14 +37,13 @@ import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 
-public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
-
+public class LocalRecoveryStorageTest extends AbstractInterpreterTest {
   private File recoveryDir = null;
 
   @Before
   public void setUp() throws Exception {
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
-        FileSystemRecoveryStorage.class.getName());
+            LocalRecoveryStorage.class.getName());
     recoveryDir = Files.createTempDir();
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath());
     super.setUp();
@@ -36,8 +53,7 @@ public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
   public void tearDown() throws Exception {
     super.tearDown();
     FileUtils.deleteDirectory(recoveryDir);
-    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
-            ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getStringValue());
+    System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName());
   }
 
   @Test
@@ -48,9 +64,9 @@ public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
     Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
     RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
     InterpreterContext context1 = InterpreterContext.builder()
-        .setNoteId("noteId")
-        .setParagraphId("paragraphId")
-        .build();
+            .setNoteId("noteId")
+            .setParagraphId("paragraphId")
+            .build();
     remoteInterpreter1.interpret("hello", context1);
 
     assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
@@ -67,18 +83,18 @@ public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
     Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
     RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
     InterpreterContext context1 = InterpreterContext.builder()
-        .setNoteId("noteId")
-        .setParagraphId("paragraphId")
-        .build();
+            .setNoteId("noteId")
+            .setParagraphId("paragraphId")
+            .build();
     remoteInterpreter1.interpret("hello", context1);
     assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
 
     Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note2");
     RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
     InterpreterContext context2 = InterpreterContext.builder()
-        .setNoteId("noteId")
-        .setParagraphId("paragraphId")
-        .build();
+            .setNoteId("noteId")
+            .setParagraphId("paragraphId")
+            .build();
     remoteInterpreter2.interpret("hello", context2);
 
     assertEquals(2, interpreterSettingManager.getRecoveryStorage().restore().size());
@@ -89,5 +105,4 @@ public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
     interpreterSetting.close();
     assertEquals(0, interpreterSettingManager.getRecoveryStorage().restore().size());
   }
-
 }
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/storage/LocalConfigStorageTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/storage/LocalConfigStorageTest.java
index cf0ac63..5352d9b 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/storage/LocalConfigStorageTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/storage/LocalConfigStorageTest.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.storage;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.util.FileUtils;
 import org.junit.Test;
 
 import java.io.BufferedWriter;
@@ -39,7 +40,7 @@ public class LocalConfigStorageTest {
         final Path destination = Files.createTempFile("test-", "file");
         final File destinationFile = destination.toFile();
         try {
-            LocalConfigStorage.atomicWriteToFile(TEST_STRING, destinationFile);
+            FileUtils.atomicWriteToFile(TEST_STRING, destinationFile);
             try (InputStream is = Files.newInputStream(destination)) {
                 String read = IOUtils.toString(is);
                 assertEquals(TEST_STRING, read);
@@ -56,7 +57,7 @@ public class LocalConfigStorageTest {
         final Path destination = Paths.get(destDir.toString(),"test-" + rnd.nextLong() + "-file");
         final File destinationFile = destination.toFile();
         try {
-            LocalConfigStorage.atomicWriteToFile(TEST_STRING, destinationFile);
+            FileUtils.atomicWriteToFile(TEST_STRING, destinationFile);
             try (InputStream is = Files.newInputStream(destination)) {
                 String read = IOUtils.toString(is);
                 assertEquals(TEST_STRING, read);
@@ -76,7 +77,7 @@ public class LocalConfigStorageTest {
             try (BufferedWriter writer = Files.newBufferedWriter(destination)) {
                 writer.write(TEST_STRING);
             }
-            String read = LocalConfigStorage.readFromFile(destinationFile);
+            String read = FileUtils.readFromFile(destinationFile);
             assertEquals(TEST_STRING, read);
         } finally {
             Files.deleteIfExists(destination);