You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/04/14 02:28:31 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4727] Fix HDFS Credentials storage

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new ede534b  [ZEPPELIN-4727] Fix HDFS Credentials storage
ede534b is described below

commit ede534bcd153b2b80c19dd212745ee7af907e0cb
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Thu Apr 2 17:12:48 2020 +0200

    [ZEPPELIN-4727] Fix HDFS Credentials storage
    
    ### What is this PR for?
    With this PR we are switching to the abstract class `ConfigStorage` to save/load Credentials. We have child classes of `ConfigStorage` which are able to save to disc or HDFS.
    
    NOTE: Cluster communication with changed user credentials is very simple. The `credential.json` content change is the only communication between cluster members. This leads to many rereads.
    
    ### What type of PR is it?
    Bug Fix
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4727
    
    ### How should this be tested?
    * **Travis-CI**: https://travis-ci.org/github/Reamer/zeppelin/builds/673728600
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <ph...@gmail.com>
    
    Closes #3718 from Reamer/credentials_storage and squashes the following commits:
    
    6c8dd45e5 [Philipp Dallig] Use ConfigStage in Credentials and handle IOException in CredentialRestApi
    
    (cherry picked from commit c9d3f4513059537e3a7a1edce47998758b8bc5c4)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../apache/zeppelin/rest/CredentialRestApi.java    |  77 ++++++-----
 .../org/apache/zeppelin/server/ZeppelinServer.java |   7 +-
 .../zeppelin/rest/CredentialsRestApiTest.java      |   3 +-
 .../zeppelin/service/NotebookServiceTest.java      |   2 +-
 .../zeppelin/notebook/FileSystemStorage.java       |  22 ++-
 .../org/apache/zeppelin/notebook/Paragraph.java    |  13 +-
 .../org/apache/zeppelin/storage/ConfigStorage.java |   3 -
 .../zeppelin/storage/FileSystemConfigStorage.java  |  27 ++--
 .../zeppelin/storage/LocalConfigStorage.java       |  31 +++--
 .../java/org/apache/zeppelin/user/Credentials.java | 150 ++++++++++-----------
 .../helium/HeliumApplicationFactoryTest.java       |   2 +-
 .../org/apache/zeppelin/notebook/NotebookTest.java |   2 +-
 .../notebook/repo/NotebookRepoSyncTest.java        |   2 +-
 .../org/apache/zeppelin/user/CredentialsTest.java  |   2 +-
 14 files changed, 185 insertions(+), 158 deletions(-)

diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/CredentialRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/CredentialRestApi.java
index d69349c..3a1e05f 100755
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/CredentialRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/CredentialRestApi.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 @Produces("application/json")
 @Singleton
 public class CredentialRestApi {
-  Logger logger = LoggerFactory.getLogger(CredentialRestApi.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(CredentialRestApi.class);
   private Credentials credentials;
   private AuthenticationService authenticationService;
   private Gson gson = new Gson();
@@ -61,11 +61,9 @@ public class CredentialRestApi {
    *
    * @param message - JSON with entity, username, password.
    * @return JSON with status.OK
-   * @throws IOException
-   * @throws IllegalArgumentException
    */
   @PUT
-  public Response putCredentials(String message) throws IOException, IllegalArgumentException {
+  public Response putCredentials(String message) {
     Map<String, String> messageMap =
         gson.fromJson(message, new TypeToken<Map<String, String>>() {}.getType());
     String entity = messageMap.get("entity");
@@ -75,47 +73,62 @@ public class CredentialRestApi {
     if (Strings.isNullOrEmpty(entity)
         || Strings.isNullOrEmpty(username)
         || Strings.isNullOrEmpty(password)) {
-      return new JsonResponse(Status.BAD_REQUEST).build();
+      return new JsonResponse<>(Status.BAD_REQUEST).build();
     }
 
     String user = authenticationService.getPrincipal();
-    logger.info("Update credentials for user {} entity {}", user, entity);
-    UserCredentials uc = credentials.getUserCredentials(user);
-    uc.putUsernamePassword(entity, new UsernamePassword(username, password));
-    credentials.putUserCredentials(user, uc);
-    return new JsonResponse(Status.OK).build();
+    LOGGER.info("Update credentials for user {} entity {}", user, entity);
+    UserCredentials uc;
+    try {
+      uc = credentials.getUserCredentials(user);
+      uc.putUsernamePassword(entity, new UsernamePassword(username, password));
+      credentials.putUserCredentials(user, uc);
+      return new JsonResponse<>(Status.OK).build();
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+      return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR).build();
+    }
   }
 
   /**
    * Get User Credentials list REST API.
    *
    * @return JSON with status.OK
-   * @throws IllegalArgumentException
    */
   @GET
-  public Response getCredentials() throws IllegalArgumentException {
+  public Response getCredentials() {
     String user = authenticationService.getPrincipal();
-    logger.info("getCredentials credentials for user {} ", user);
-    UserCredentials uc = credentials.getUserCredentials(user);
-    return new JsonResponse<>(Status.OK, uc).build();
+    LOGGER.info("getCredentials for user {} ", user);
+    UserCredentials uc;
+    try {
+      uc = credentials.getUserCredentials(user);
+      return new JsonResponse<>(Status.OK, uc).build();
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+      return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR).build();
+    }
   }
 
   /**
    * Remove User Credentials REST API.
    *
    * @return JSON with status.OK
-   * @throws IOException
-   * @throws IllegalArgumentException
    */
   @DELETE
-  public Response removeCredentials() throws IOException, IllegalArgumentException {
+  public Response removeCredentials() {
     String user = authenticationService.getPrincipal();
-    logger.info("removeCredentials credentials for user {} ", user);
-    UserCredentials uc = credentials.removeUserCredentials(user);
-    if (uc == null) {
-      return new JsonResponse(Status.NOT_FOUND).build();
+    LOGGER.info("removeCredentials for user {} ", user);
+    UserCredentials uc;
+    try {
+      uc = credentials.removeUserCredentials(user);
+      if (uc == null) {
+        return new JsonResponse<>(Status.NOT_FOUND).build();
+      }
+      return new JsonResponse<>(Status.OK).build();
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+      return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR).build();
     }
-    return new JsonResponse(Status.OK).build();
   }
 
   /**
@@ -123,18 +136,20 @@ public class CredentialRestApi {
    *
    * @param
    * @return JSON with status.OK
-   * @throws IOException
-   * @throws IllegalArgumentException
    */
   @DELETE
   @Path("{entity}")
-  public Response removeCredentialEntity(@PathParam("entity") String entity)
-      throws IOException, IllegalArgumentException {
+  public Response removeCredentialEntity(@PathParam("entity") String entity) {
     String user = authenticationService.getPrincipal();
-    logger.info("removeCredentialEntity for user {} entity {}", user, entity);
-    if (!credentials.removeCredentialEntity(user, entity)) {
-      return new JsonResponse(Status.NOT_FOUND).build();
+    LOGGER.info("removeCredentialEntity for user {} entity {}", user, entity);
+    try {
+      if (!credentials.removeCredentialEntity(user, entity)) {
+        return new JsonResponse<>(Status.NOT_FOUND).build();
+      }
+      return new JsonResponse<>(Status.OK).build();
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+      return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR).build();
     }
-    return new JsonResponse(Status.OK).build();
   }
 }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 493d7e7..94ffca0 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -145,12 +145,7 @@ public class ZeppelinServer extends ResourceConfig {
         new AbstractBinder() {
           @Override
           protected void configure() {
-            Credentials credentials =
-                new Credentials(
-                    conf.credentialsPersist(),
-                    conf.getCredentialsPath(),
-                    conf.getCredentialsEncryptKey());
-
+            Credentials credentials = new Credentials(conf);
             bindAsContract(InterpreterFactory.class).in(Singleton.class);
             bindAsContract(NotebookRepoSync.class).to(NotebookRepo.class).in(Immediate.class);
             bind(LuceneSearch.class).to(SearchService.class).in(Singleton.class);
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/CredentialsRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/CredentialsRestApiTest.java
index 3af42a0..f6f00a1 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/CredentialsRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/CredentialsRestApiTest.java
@@ -44,8 +44,7 @@ public class CredentialsRestApiTest {
 
   @Before
   public void setUp() throws IOException {
-    credentials =
-        new Credentials(false, Files.createTempFile("credentials", "test").toString(), null);
+    credentials = new Credentials();
     authenticationService = new NoAuthenticationService();
     credentialRestApi = new CredentialRestApi(credentials, authenticationService);
   }
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
index 47bbfde..cdde980 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
@@ -115,7 +115,7 @@ public class NotebookServiceTest {
     when(mockInterpreterGroup.getInterpreterSetting()).thenReturn(mockInterpreterSetting);
     when(mockInterpreterSetting.getStatus()).thenReturn(InterpreterSetting.Status.READY);
     SearchService searchService = new LuceneSearch(zeppelinConfiguration);
-    Credentials credentials = new Credentials(false, null, null);
+    Credentials credentials = new Credentials();
     NoteManager noteManager = new NoteManager(notebookRepo);
     AuthorizationService authorizationService = new AuthorizationService(noteManager, zeppelinConfiguration);
     Notebook notebook =
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java
index ddf28be..40e7113 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java
@@ -6,6 +6,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -14,13 +15,17 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.FilePermission;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 
 /**
@@ -93,7 +98,7 @@ public class FileSystemStorage {
 
     this.fs = FileSystem.get(zepConfigURI, this.hadoopConf);
   }
-  
+
   public boolean isS3AFileSystem(URI defaultFSURI, URI zepConfigURI) {
     return defaultFSURI.getScheme().equals(S3A)
       || (StringUtils.isNotEmpty(zepConfigURI.getScheme())
@@ -201,6 +206,20 @@ public class FileSystemStorage {
 
   public void writeFile(final String content, final Path file, boolean writeTempFileFirst)
       throws IOException {
+      writeFile(content, file, writeTempFileFirst, null);
+  }
+
+  public void writeFile(final String content, final Path file, boolean writeTempFileFirst, Set<PosixFilePermission> permissions)
+      throws IOException {
+    FsPermission fsPermission;
+    if (permissions == null || permissions.isEmpty()) {
+      fsPermission = FsPermission.getFileDefault();
+    } else {
+      // FsPermission expects a 10-character string because of the leading
+      // directory indicator, i.e. "drwx------". The JDK toString method returns
+      // a 9-character string, so prepend a leading character.
+      fsPermission = FsPermission.valueOf("-" + PosixFilePermissions.toString(permissions));
+    }
     callHdfsOperation(new HdfsOperation<Void>() {
       @Override
       public Void call() throws IOException {
@@ -208,6 +227,7 @@ public class FileSystemStorage {
             zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
         Path tmpFile = new Path(file.toString() + ".tmp");
         IOUtils.copyBytes(in, fs.create(tmpFile), hadoopConf);
+        fs.setPermission(tmpFile, fsPermission);
         fs.delete(file, true);
         fs.rename(tmpFile, file);
         return null;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 00de528..096df6d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -526,13 +526,17 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
 
     Credentials credentials = note.getCredentials();
     if (subject != null) {
-      UserCredentials userCredentials =
-          credentials.getUserCredentials(subject.getUser());
+      UserCredentials userCredentials;
+      try {
+        userCredentials = credentials.getUserCredentials(subject.getUser());
+      } catch (IOException e) {
+        LOGGER.warn("Unable to get Usercredentials. Working with empty UserCredentials", e);
+        userCredentials = new UserCredentials();
+      }
       subject.setUserCredentials(userCredentials);
     }
 
-    InterpreterContext interpreterContext =
-        InterpreterContext.builder()
+    return InterpreterContext.builder()
             .setNoteId(note.getId())
             .setNoteName(note.getName())
             .setParagraphId(getId())
@@ -547,7 +551,6 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
             .setAngularObjectRegistry(registry)
             .setResourcePool(resourcePool)
             .build();
-    return interpreterContext;
   }
 
   public void setStatusToUserParagraph(Status status) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/ConfigStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/ConfigStorage.java
index b3175e5..f4a4f3e 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/ConfigStorage.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/ConfigStorage.java
@@ -22,12 +22,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.helium.HeliumConf;
 import org.apache.zeppelin.interpreter.InterpreterInfoSaving;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.notebook.NotebookAuthorizationInfoSaving;
-import org.apache.zeppelin.user.Credentials;
-import org.apache.zeppelin.user.CredentialsInfoSaving;
 import org.apache.zeppelin.util.ReflectionUtils;
 
 import java.io.IOException;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/FileSystemConfigStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/FileSystemConfigStorage.java
index 20c19b6..03883d1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/FileSystemConfigStorage.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/FileSystemConfigStorage.java
@@ -18,20 +18,18 @@
 
 package org.apache.zeppelin.storage;
 
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
 import org.apache.hadoop.fs.Path;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.helium.HeliumConf;
 import org.apache.zeppelin.interpreter.InterpreterInfoSaving;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.notebook.FileSystemStorage;
 import org.apache.zeppelin.notebook.NotebookAuthorizationInfoSaving;
-import org.apache.zeppelin.user.CredentialsInfoSaving;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.EnumSet;
+import java.util.Set;
 
 /**
  * It could be used either local file system or hadoop distributed file system,
@@ -50,8 +48,7 @@ public class FileSystemConfigStorage extends ConfigStorage {
   public FileSystemConfigStorage(ZeppelinConfiguration zConf) throws IOException {
     super(zConf);
     this.fs = new FileSystemStorage(zConf, zConf.getConfigFSDir());
-    LOGGER.info("Creating FileSystem: " + this.fs.getFs().getClass().getName() +
-        " for Zeppelin Config");
+    LOGGER.info("Creating FileSystem: {} for Zeppelin Config", this.fs.getFs().getClass().getName());
     Path configPath = this.fs.makeQualified(new Path(zConf.getConfigFSDir()));
     this.fs.tryMkDir(configPath);
     LOGGER.info("Using folder {} to store Zeppelin Config", configPath);
@@ -62,7 +59,7 @@ public class FileSystemConfigStorage extends ConfigStorage {
 
   @Override
   public void save(InterpreterInfoSaving settingInfos) throws IOException {
-    LOGGER.info("Save Interpreter Settings to " + interpreterSettingPath);
+    LOGGER.info("Save Interpreter Settings to {}", interpreterSettingPath);
     fs.writeFile(settingInfos.toJson(), interpreterSettingPath, false);
   }
 
@@ -72,13 +69,14 @@ public class FileSystemConfigStorage extends ConfigStorage {
       LOGGER.warn("Interpreter Setting file {} is not existed", interpreterSettingPath);
       return null;
     }
-    LOGGER.info("Load Interpreter Setting from file: " + interpreterSettingPath);
+    LOGGER.info("Load Interpreter Setting from file: {}", interpreterSettingPath);
     String json = fs.readFile(interpreterSettingPath);
     return buildInterpreterInfoSaving(json);
   }
 
+  @Override
   public void save(NotebookAuthorizationInfoSaving authorizationInfoSaving) throws IOException {
-    LOGGER.info("Save notebook authorization to file: " + authorizationPath);
+    LOGGER.info("Save notebook authorization to file: {}", authorizationPath);
     fs.writeFile(authorizationInfoSaving.toJson(), authorizationPath, false);
   }
 
@@ -88,7 +86,7 @@ public class FileSystemConfigStorage extends ConfigStorage {
       LOGGER.warn("Notebook Authorization file {} is not existed", authorizationPath);
       return null;
     }
-    LOGGER.info("Load notebook authorization from file: " + authorizationPath);
+    LOGGER.info("Load notebook authorization from file: {}", authorizationPath);
     String json = this.fs.readFile(authorizationPath);
     return NotebookAuthorizationInfoSaving.fromJson(json);
   }
@@ -99,14 +97,15 @@ public class FileSystemConfigStorage extends ConfigStorage {
       LOGGER.warn("Credential file {} is not existed", credentialPath);
       return null;
     }
-    LOGGER.info("Load Credential from file: " + credentialPath);
+    LOGGER.info("Load Credential from file: {}", credentialPath);
     return this.fs.readFile(credentialPath);
   }
 
   @Override
   public void saveCredentials(String credentials) throws IOException {
-    LOGGER.info("Save Credentials to file: " + credentialPath);
-    fs.writeFile(credentials, credentialPath, false);
+    LOGGER.info("Save Credentials to file: {}", credentialPath);
+    Set<PosixFilePermission> permissions = EnumSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
+    fs.writeFile(credentials, credentialPath, false, permissions);
   }
 
 }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java
index 7cb3ba6..1f2eb3c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java
@@ -33,15 +33,17 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.FileSystems;
 import java.nio.file.FileSystem;
-import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.EnumSet;
+import java.util.Set;
 
 /**
  * Storing config in local file system
  */
 public class LocalConfigStorage extends ConfigStorage {
 
-  private static Logger LOGGER = LoggerFactory.getLogger(LocalConfigStorage.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(LocalConfigStorage.class);
 
   private File interpreterSettingPath;
   private File authorizationPath;
@@ -56,7 +58,7 @@ public class LocalConfigStorage extends ConfigStorage {
 
   @Override
   public void save(InterpreterInfoSaving settingInfos) throws IOException {
-    LOGGER.info("Save Interpreter Setting to " + interpreterSettingPath.getAbsolutePath());
+    LOGGER.info("Save Interpreter Setting to {}", interpreterSettingPath.getAbsolutePath());
     atomicWriteToFile(settingInfos.toJson(), interpreterSettingPath);
   }
 
@@ -66,14 +68,14 @@ public class LocalConfigStorage extends ConfigStorage {
       LOGGER.warn("Interpreter Setting file {} is not existed", interpreterSettingPath);
       return null;
     }
-    LOGGER.info("Load Interpreter Setting from file: " + interpreterSettingPath);
+    LOGGER.info("Load Interpreter Setting from file: {}", interpreterSettingPath);
     String json = readFromFile(interpreterSettingPath);
     return buildInterpreterInfoSaving(json);
   }
 
   @Override
   public void save(NotebookAuthorizationInfoSaving authorizationInfoSaving) throws IOException {
-    LOGGER.info("Save notebook authorization to file: " + authorizationPath);
+    LOGGER.info("Save notebook authorization to file: {}", authorizationPath);
     atomicWriteToFile(authorizationInfoSaving.toJson(), authorizationPath);
   }
 
@@ -83,7 +85,7 @@ public class LocalConfigStorage extends ConfigStorage {
       LOGGER.warn("NotebookAuthorization file {} is not existed", authorizationPath);
       return null;
     }
-    LOGGER.info("Load notebook authorization from file: " + authorizationPath);
+    LOGGER.info("Load notebook authorization from file: {}", authorizationPath);
     String json = readFromFile(authorizationPath);
     return NotebookAuthorizationInfoSaving.fromJson(json);
   }
@@ -94,14 +96,15 @@ public class LocalConfigStorage extends ConfigStorage {
       LOGGER.warn("Credential file {} is not existed", credentialPath);
       return null;
     }
-    LOGGER.info("Load Credential from file: " + credentialPath);
+    LOGGER.info("Load Credential from file: {}", credentialPath);
     return readFromFile(credentialPath);
   }
 
   @Override
   public void saveCredentials(String credentials) throws IOException {
-    LOGGER.info("Save Credentials to file: " + credentialPath);
-    atomicWriteToFile(credentials, credentialPath);
+    LOGGER.info("Save Credentials to file: {}", credentialPath);
+    Set<PosixFilePermission> permissions = EnumSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
+    atomicWriteToFile(credentials, credentialPath, permissions);
   }
 
   @VisibleForTesting
@@ -112,12 +115,15 @@ public class LocalConfigStorage extends ConfigStorage {
   }
 
   @VisibleForTesting
-  static void atomicWriteToFile(String content, File file) throws IOException {
+  static void atomicWriteToFile(String content, File file, Set<PosixFilePermission> permissions) throws IOException {
     FileSystem defaultFileSystem = FileSystems.getDefault();
     Path destinationFilePath = defaultFileSystem.getPath(file.getCanonicalPath());
     Path destinationDirectory = destinationFilePath.getParent();
     Files.createDirectories(destinationDirectory);
     File tempFile = Files.createTempFile(destinationDirectory, file.getName(), null).toFile();
+    if (permissions != null && !permissions.isEmpty()) {
+      Files.setPosixFilePermissions(tempFile.toPath(), permissions);
+    }
     try (FileOutputStream out = new FileOutputStream(tempFile)) {
       IOUtils.write(content, out);
     } catch (IOException iox) {
@@ -138,4 +144,9 @@ public class LocalConfigStorage extends ConfigStorage {
     }
   }
 
+  @VisibleForTesting
+  static void atomicWriteToFile(String content, File file) throws IOException {
+    atomicWriteToFile(content, file, null);
+  }
+
 }
\ No newline at end of file
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/user/Credentials.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/user/Credentials.java
index 61f7fff..0cc68ff 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/user/Credentials.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/user/Credentials.java
@@ -18,69 +18,79 @@
 package org.apache.zeppelin.user;
 
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.file.Files;
-import java.nio.file.attribute.PosixFilePermission;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
-import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
-import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.storage.ConfigStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 
 /**
  * Class defining credentials for data source authorization
  */
 public class Credentials {
+
   private static final Logger LOG = LoggerFactory.getLogger(Credentials.class);
 
+  private ConfigStorage storage;
   private Map<String, UserCredentials> credentialsMap;
   private Gson gson;
-  private Boolean credentialsPersist = true;
-  File credentialsFile;
-
   private Encryptor encryptor;
-  
+
   /**
-   * Wrapper fro user credentials. It can load credentials from a file if credentialsPath is
-   * supplied, and will encrypt the file if an encryptKey is supplied.
+   * Wrapper for user credentials. It can load credentials from a file
+   * and will encrypt the file if an encryptKey is configured.
    *
-   * @param credentialsPersist
-   * @param credentialsPath
-   * @param encryptKey
+   * @param conf
+   * @throws IOException
    */
-  public Credentials(Boolean credentialsPersist, String credentialsPath, String encryptKey) {
-    if (encryptKey != null) {
-      this.encryptor = new Encryptor(encryptKey);
+  public Credentials(ZeppelinConfiguration conf) {
+    credentialsMap = new HashMap<>();
+    if (conf.credentialsPersist().booleanValue()) {
+      String encryptKey = conf.getCredentialsEncryptKey();
+      if (StringUtils.isNotBlank(encryptKey)) {
+        this.encryptor = new Encryptor(encryptKey);
+      }
+      try {
+        storage = ConfigStorage.getInstance(conf);
+        GsonBuilder builder = new GsonBuilder();
+        builder.setPrettyPrinting();
+        gson = builder.create();
+        loadFromFile();
+      } catch (IOException e) {
+        LOG.error("Fail to create ConfigStorage for Credentials. Persistenz will be disabled", e);
+        encryptor = null;
+        storage = null;
+        gson = null;
+      }
+    } else {
+      encryptor = null;
+      storage = null;
+      gson = null;
     }
+  }
 
-    this.credentialsPersist = credentialsPersist;
-    if (credentialsPath != null) {
-      credentialsFile = new File(credentialsPath);
-    }
+  /**
+   * Wrapper for inmemory user credentials.
+   *
+   * @param conf
+   * @throws IOException
+   */
+  public Credentials() {
     credentialsMap = new HashMap<>();
-
-    if (credentialsPersist) {
-      GsonBuilder builder = new GsonBuilder();
-      builder.setPrettyPrinting();
-      gson = builder.create();
-      loadFromFile();
-    }
+    encryptor = null;
+    storage = null;
+    gson = null;
   }
 
-  public UserCredentials getUserCredentials(String username) {
+  public UserCredentials getUserCredentials(String username) throws IOException {
+    loadCredentials();
     UserCredentials uc = credentialsMap.get(username);
     if (uc == null) {
       uc = new UserCredentials();
@@ -89,20 +99,22 @@ public class Credentials {
   }
 
   public void putUserCredentials(String username, UserCredentials uc) throws IOException {
+    loadCredentials();
     credentialsMap.put(username, uc);
     saveCredentials();
   }
 
   public UserCredentials removeUserCredentials(String username) throws IOException {
-    UserCredentials uc;
-    uc = credentialsMap.remove(username);
+    loadCredentials();
+    UserCredentials uc = credentialsMap.remove(username);
     saveCredentials();
     return uc;
   }
 
   public boolean removeCredentialEntity(String username, String entity) throws IOException {
+    loadCredentials();
     UserCredentials uc = credentialsMap.get(username);
-    if (uc != null && uc.existUsernamePassword(entity) == false) {
+    if (uc == null || !uc.existUsernamePassword(entity)) {
       return false;
     }
 
@@ -112,41 +124,30 @@ public class Credentials {
   }
 
   public void saveCredentials() throws IOException {
-    if (credentialsPersist) {
+    if (storage != null) {
       saveToFile();
     }
   }
 
-  private void loadFromFile() {
-    LOG.info(credentialsFile.getAbsolutePath());
-    if (!credentialsFile.exists()) {
-      // nothing to read
-      return;
+  private void loadCredentials() throws IOException {
+    if (storage != null) {
+      loadFromFile();
     }
+  }
 
+  private void loadFromFile() throws IOException {
     try {
-      FileInputStream fis = new FileInputStream(credentialsFile);
-      InputStreamReader isr = new InputStreamReader(fis);
-      BufferedReader bufferedReader = new BufferedReader(isr);
-      StringBuilder sb = new StringBuilder();
-      String line;
-      while ((line = bufferedReader.readLine()) != null) {
-        sb.append(line);
-      }
-      isr.close();
-      fis.close();
-
-      String json = sb.toString();
-
+      String json = storage.loadCredentials();
       if (encryptor != null) {
         json = encryptor.decrypt(json);
       }
 
       CredentialsInfoSaving info = CredentialsInfoSaving.fromJson(json);
-      this.credentialsMap = info.credentialsMap;
+      if (info != null) {
+        this.credentialsMap = info.credentialsMap;
+      }
     } catch (IOException e) {
-      LOG.error("Error loading credentials file", e);
-      e.printStackTrace();
+      throw new IOException("Error loading credentials file", e);
     }
   }
 
@@ -160,25 +161,12 @@ public class Credentials {
     }
 
     try {
-      if (!credentialsFile.exists()) {
-        credentialsFile.createNewFile();
-
-        Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE);
-        Files.setPosixFilePermissions(credentialsFile.toPath(), permissions);
-      }
-
-      FileOutputStream fos = new FileOutputStream(credentialsFile, false);
-      OutputStreamWriter out = new OutputStreamWriter(fos);
-
       if (encryptor != null) {
         jsonString = encryptor.encrypt(jsonString);
       }
-
-      out.append(jsonString);
-      out.close();
-      fos.close();
+      storage.saveCredentials(jsonString);
     } catch (IOException e) {
-      LOG.error("Error saving credentials file", e);
+      throw new IOException("Error saving credentials file", e);
     }
   }
 }
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
index 1cafc23..31d4c2b 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
@@ -70,7 +70,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest {
             interpreterFactory,
             interpreterSettingManager,
             search,
-            new Credentials(false, null, null));
+            new Credentials());
 
     heliumAppFactory = new HeliumApplicationFactory(notebook, null);
 
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index e35079c..5a4752b 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -97,7 +97,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     noteManager = new NoteManager(notebookRepo);
     authorizationService = new AuthorizationService(noteManager, conf);
 
-    credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath(), null);
+    credentials = new Credentials(conf);
     notebook = new Notebook(conf, authorizationService, notebookRepo, noteManager, interpreterFactory, interpreterSettingManager, search,
             credentials, null);
     notebook.setParagraphJobListener(this);
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
index fc3c0aa..5dfd5da 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
@@ -102,7 +102,7 @@ public class NotebookRepoSyncTest {
     notebookRepoSync = new NotebookRepoSync(conf);
     noteManager = new NoteManager(notebookRepoSync);
     authorizationService = new AuthorizationService(noteManager, conf);
-    credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath(), null);
+    credentials = new Credentials(conf);
     notebook = new Notebook(conf, authorizationService, notebookRepoSync, noteManager, factory, interpreterSettingManager, search, credentials, null);
     anonymous = new AuthenticationInfo("anonymous");
   }
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/user/CredentialsTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/user/CredentialsTest.java
index 84a1244..8b59003 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/user/CredentialsTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/user/CredentialsTest.java
@@ -27,7 +27,7 @@ public class CredentialsTest {
 
   @Test
   public void testDefaultProperty() throws IOException {
-    Credentials credentials = new Credentials(false, null, null);
+    Credentials credentials = new Credentials();
     UserCredentials userCredentials = new UserCredentials();
     UsernamePassword up1 = new UsernamePassword("user2", "password");
     userCredentials.putUsernamePassword("hive(vertica)", up1);