You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/30 15:33:45 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 03379c7  [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc
03379c7 is described below

commit 03379c70cdb644749dd06f7135b3fa017cda1cf7
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jun 8 10:02:45 2020 +0800

    [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc
    
    ### What is this PR for?
    User credential only works in the default jdbc interpreter. If user create a new jdbc interpreter, e.g. hive, then credential won't work. This PR fix this and also do some code refactoring to make the jdbc interpreter module more readable.
    
    ### What type of PR is it?
    [Bug Fix  | Refactoring]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4863
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3806 from zjffdu/ZEPPELIN-4863 and squashes the following commits:
    
    9fd10eeb2 [Jeff Zhang] [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc
    abb00b376 [Jeff Zhang] [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc
    
    (cherry picked from commit 55c1d276746b9bbf5458ca92cb55b2aeb0bae595)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../org/apache/zeppelin/jdbc/JDBCInterpreter.java  | 193 +++++++++++----------
 .../zeppelin/jdbc/JDBCUserConfigurations.java      |  48 +++--
 jdbc/src/main/resources/interpreter-setting.json   |  14 +-
 .../apache/zeppelin/jdbc/JDBCInterpreterTest.java  | 166 ++++++++++--------
 .../org/apache/zeppelin/notebook/Paragraph.java    |   7 +-
 5 files changed, 235 insertions(+), 193 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index bdda696..5dcf4d0 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -97,7 +97,7 @@ import org.apache.zeppelin.user.UsernamePassword;
  * </p>
  */
 public class JDBCInterpreter extends KerberosInterpreter {
-  private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(JDBCInterpreter.class);
 
   static final String INTERPRETER_NAME = "jdbc";
   static final String COMMON_KEY = "common";
@@ -152,7 +152,9 @@ public class JDBCInterpreter extends KerberosInterpreter {
           "KerberosConfigPath", "KerberosKeytabPath", "KerberosCredentialCachePath",
           "extraCredentials", "roles", "sessionProperties"));
 
+  // database --> Properties
   private final HashMap<String, Properties> basePropertiesMap;
+  // username --> User Configuration
   private final HashMap<String, JDBCUserConfigurations> jdbcUserConfigurationsMap;
   private final HashMap<String, SqlCompleter> sqlCompletersMap;
 
@@ -189,23 +191,19 @@ public class JDBCInterpreter extends KerberosInterpreter {
         return true;
       }
     } catch (Exception e) {
-      logger.error("Unable to run kinit for zeppelin", e);
+      LOGGER.error("Unable to run kinit for zeppelin", e);
     }
     return false;
   }
 
-  public HashMap<String, Properties> getPropertiesMap() {
-    return basePropertiesMap;
-  }
-
   @Override
   public void open() {
     super.open();
     for (String propertyKey : properties.stringPropertyNames()) {
-      logger.debug("propertyKey: {}", propertyKey);
+      LOGGER.debug("propertyKey: {}", propertyKey);
       String[] keyValue = propertyKey.split("\\.", 2);
       if (2 == keyValue.length) {
-        logger.debug("key: {}, value: {}", keyValue[0], keyValue[1]);
+        LOGGER.debug("key: {}, value: {}", keyValue[0], keyValue[1]);
 
         Properties prefixProperties;
         if (basePropertiesMap.containsKey(keyValue[0])) {
@@ -223,7 +221,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
       if (!COMMON_KEY.equals(key)) {
         Properties properties = basePropertiesMap.get(key);
         if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
-          logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
+          LOGGER.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
               key, DRIVER_KEY, key, key, URL_KEY);
           removeKeySet.add(key);
         }
@@ -233,7 +231,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
     for (String key : removeKeySet) {
       basePropertiesMap.remove(key);
     }
-    logger.debug("JDBC PropretiesMap: {}", basePropertiesMap);
+    LOGGER.debug("JDBC PropertiesMap: {}", basePropertiesMap);
 
     setMaxLineResults();
     setMaxRows();
@@ -295,12 +293,12 @@ public class JDBCInterpreter extends KerberosInterpreter {
       // protection to release connection
       executorService.awaitTermination(3, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
-      logger.warn("Completion timeout", e);
+      LOGGER.warn("Completion timeout", e);
       if (connection != null) {
         try {
           connection.close();
         } catch (SQLException e1) {
-          logger.warn("Error close connection", e1);
+          LOGGER.warn("Error close connection", e1);
         }
       }
     }
@@ -312,7 +310,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
       try {
         configurations.initStatementMap();
       } catch (Exception e) {
-        logger.error("Error while closing paragraphIdStatementMap statement...", e);
+        LOGGER.error("Error while closing paragraphIdStatementMap statement...", e);
       }
     }
   }
@@ -322,13 +320,13 @@ public class JDBCInterpreter extends KerberosInterpreter {
       try {
         closeDBPool(key, DEFAULT_KEY);
       } catch (SQLException e) {
-        logger.error("Error while closing database pool.", e);
+        LOGGER.error("Error while closing database pool.", e);
       }
       try {
         JDBCUserConfigurations configurations = jdbcUserConfigurationsMap.get(key);
         configurations.initConnectionPoolMap();
       } catch (SQLException e) {
-        logger.error("Error while closing initConnectionPoolMap.", e);
+        LOGGER.error("Error while closing initConnectionPoolMap.", e);
       }
     }
   }
@@ -340,22 +338,22 @@ public class JDBCInterpreter extends KerberosInterpreter {
       initStatementMap();
       initConnectionPoolMap();
     } catch (Exception e) {
-      logger.error("Error while closing...", e);
+      LOGGER.error("Error while closing...", e);
     }
   }
 
-  private String getEntityName(String replName) {
-    StringBuffer entityName = new StringBuffer();
-    entityName.append(INTERPRETER_NAME);
-    entityName.append(".");
-    entityName.append(replName);
-    return entityName.toString();
+  private String getEntityName(String replName, String propertyKey) {
+    if ("jdbc".equals(replName)) {
+      return propertyKey;
+    } else {
+      return replName;
+    }
   }
 
-  private String getJDBCDriverName(String user, String propertyKey) {
+  private String getJDBCDriverName(String user, String dbPrefix) {
     StringBuffer driverName = new StringBuffer();
     driverName.append(DBCP_STRING);
-    driverName.append(propertyKey);
+    driverName.append(dbPrefix);
     driverName.append(user);
     return driverName.toString();
   }
@@ -367,10 +365,10 @@ public class JDBCInterpreter extends KerberosInterpreter {
   }
 
   private UsernamePassword getUsernamePassword(InterpreterContext interpreterContext,
-                                               String replName) {
+                                               String entity) {
     UserCredentials uc = interpreterContext.getAuthenticationInfo().getUserCredentials();
     if (uc != null) {
-      return uc.getUsernamePassword(replName);
+      return uc.getUsernamePassword(entity);
     }
     return null;
   }
@@ -393,36 +391,36 @@ public class JDBCInterpreter extends KerberosInterpreter {
     }
   }
 
-  private void setUserProperty(String propertyKey, InterpreterContext interpreterContext)
+  private void setUserProperty(String dbPrefix, InterpreterContext context)
       throws SQLException, IOException, InterpreterException {
 
-    String user = interpreterContext.getAuthenticationInfo().getUser();
+    String user = context.getAuthenticationInfo().getUser();
 
     JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user);
-    if (basePropertiesMap.get(propertyKey).containsKey(USER_KEY) &&
-        !basePropertiesMap.get(propertyKey).getProperty(USER_KEY).isEmpty()) {
-      String password = getPassword(basePropertiesMap.get(propertyKey));
+    if (basePropertiesMap.get(dbPrefix).containsKey(USER_KEY) &&
+        !basePropertiesMap.get(dbPrefix).getProperty(USER_KEY).isEmpty()) {
+      String password = getPassword(basePropertiesMap.get(dbPrefix));
       if (!isEmpty(password)) {
-        basePropertiesMap.get(propertyKey).setProperty(PASSWORD_KEY, password);
+        basePropertiesMap.get(dbPrefix).setProperty(PASSWORD_KEY, password);
       }
     }
-    jdbcUserConfigurations.setPropertyMap(propertyKey, basePropertiesMap.get(propertyKey));
-    if (existAccountInBaseProperty(propertyKey)) {
+    jdbcUserConfigurations.setPropertyMap(dbPrefix, basePropertiesMap.get(dbPrefix));
+    if (existAccountInBaseProperty(dbPrefix)) {
       return;
     }
-    jdbcUserConfigurations.cleanUserProperty(propertyKey);
+    jdbcUserConfigurations.cleanUserProperty(dbPrefix);
 
-    UsernamePassword usernamePassword = getUsernamePassword(interpreterContext,
-            getEntityName(interpreterContext.getReplName()));
+    UsernamePassword usernamePassword = getUsernamePassword(context,
+            getEntityName(context.getReplName(), dbPrefix));
     if (usernamePassword != null) {
-      jdbcUserConfigurations.setUserProperty(propertyKey, usernamePassword);
+      jdbcUserConfigurations.setUserProperty(dbPrefix, usernamePassword);
     } else {
-      closeDBPool(user, propertyKey);
+      closeDBPool(user, dbPrefix);
     }
   }
 
-  private void createConnectionPool(String url, String user, String propertyKey,
-      Properties properties) throws SQLException, ClassNotFoundException, IOException {
+  private void createConnectionPool(String url, String user, String dbPrefix,
+      Properties properties) throws SQLException, ClassNotFoundException {
 
     String driverClass = properties.getProperty(DRIVER_KEY);
     if (driverClass != null && (driverClass.equals("com.facebook.presto.jdbc.PrestoDriver")
@@ -449,62 +447,62 @@ public class JDBCInterpreter extends KerberosInterpreter {
     poolableConnectionFactory.setPool(connectionPool);
     Class.forName(driverClass);
     PoolingDriver driver = new PoolingDriver();
-    driver.registerPool(propertyKey + user, connectionPool);
-    getJDBCConfiguration(user).saveDBDriverPool(propertyKey, driver);
+    driver.registerPool(dbPrefix + user, connectionPool);
+    getJDBCConfiguration(user).saveDBDriverPool(dbPrefix, driver);
   }
 
-  private Connection getConnectionFromPool(String url, String user, String propertyKey,
-      Properties properties) throws SQLException, ClassNotFoundException, IOException {
-    String jdbcDriver = getJDBCDriverName(user, propertyKey);
+  private Connection getConnectionFromPool(String url, String user, String dbPrefix,
+      Properties properties) throws SQLException, ClassNotFoundException {
+    String jdbcDriver = getJDBCDriverName(user, dbPrefix);
 
-    if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) {
-      createConnectionPool(url, user, propertyKey, properties);
+    if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(dbPrefix)) {
+      createConnectionPool(url, user, dbPrefix, properties);
     }
     return DriverManager.getConnection(jdbcDriver);
   }
 
-  public Connection getConnection(String propertyKey, InterpreterContext interpreterContext)
+  public Connection getConnection(String dbPrefix, InterpreterContext context)
       throws ClassNotFoundException, SQLException, InterpreterException, IOException {
-    final String user =  interpreterContext.getAuthenticationInfo().getUser();
+    final String user =  context.getAuthenticationInfo().getUser();
     Connection connection;
-    if (propertyKey == null || basePropertiesMap.get(propertyKey) == null) {
+    if (dbPrefix == null || basePropertiesMap.get(dbPrefix) == null) {
       return null;
     }
 
     JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user);
-    setUserProperty(propertyKey, interpreterContext);
+    setUserProperty(dbPrefix, context);
 
-    final Properties properties = jdbcUserConfigurations.getPropertyMap(propertyKey);
+    final Properties properties = jdbcUserConfigurations.getPropertyMap(dbPrefix);
     final String url = properties.getProperty(URL_KEY);
 
     if (isEmpty(getProperty("zeppelin.jdbc.auth.type"))) {
-      connection = getConnectionFromPool(url, user, propertyKey, properties);
+      connection = getConnectionFromPool(url, user, dbPrefix, properties);
     } else {
       UserGroupInformation.AuthenticationMethod authType =
           JDBCSecurityImpl.getAuthtype(getProperties());
 
-      final String connectionUrl = appendProxyUserToURL(url, user, propertyKey);
+      final String connectionUrl = appendProxyUserToURL(url, user, dbPrefix);
 
       JDBCSecurityImpl.createSecureConfiguration(getProperties(), authType);
       switch (authType) {
         case KERBEROS:
           if (user == null || "false".equalsIgnoreCase(
               getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) {
-            connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
+            connection = getConnectionFromPool(connectionUrl, user, dbPrefix, properties);
           } else {
-            if (basePropertiesMap.get(propertyKey).containsKey("proxy.user.property")) {
-              connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
+            if (basePropertiesMap.get(dbPrefix).containsKey("proxy.user.property")) {
+              connection = getConnectionFromPool(connectionUrl, user, dbPrefix, properties);
             } else {
               UserGroupInformation ugi = null;
               try {
                 ugi = UserGroupInformation.createProxyUser(
                     user, UserGroupInformation.getCurrentUser());
               } catch (Exception e) {
-                logger.error("Error in getCurrentUser", e);
+                LOGGER.error("Error in getCurrentUser", e);
                 throw new InterpreterException("Error in getCurrentUser", e);
               }
 
-              final String poolKey = propertyKey;
+              final String poolKey = dbPrefix;
               try {
                 connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
                   @Override
@@ -513,7 +511,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
                   }
                 });
               } catch (Exception e) {
-                logger.error("Error in doAs", e);
+                LOGGER.error("Error in doAs", e);
                 throw new InterpreterException("Error in doAs", e);
               }
             }
@@ -521,7 +519,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
           break;
 
         default:
-          connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
+          connection = getConnectionFromPool(connectionUrl, user, dbPrefix, properties);
       }
     }
 
@@ -538,13 +536,13 @@ public class JDBCInterpreter extends KerberosInterpreter {
       if (lastIndexOfUrl == -1) {
         lastIndexOfUrl = connectionUrl.length();
       }
-      logger.info("Using proxy user as :" + user);
-      logger.info("Using proxy property for user as :" +
+      LOGGER.info("Using proxy user as: {}", user);
+      LOGGER.info("Using proxy property for user as: {}",
           basePropertiesMap.get(propertyKey).getProperty("proxy.user.property"));
       connectionUrl.insert(lastIndexOfUrl, ";" +
           basePropertiesMap.get(propertyKey).getProperty("proxy.user.property") + "=" + user + ";");
     } else if (user != null && !user.equals("anonymous") && url.contains("hive")) {
-      logger.warn("User impersonation for hive has changed please refer: http://zeppelin.apache" +
+      LOGGER.warn("User impersonation for hive has changed please refer: http://zeppelin.apache" +
           ".org/docs/latest/interpreter/jdbc.html#apache-hive");
     }
 
@@ -570,9 +568,9 @@ public class JDBCInterpreter extends KerberosInterpreter {
               + properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY));
         }
       } catch (Exception e) {
-        logger.error("Failed to retrieve password from JCEKS \n" +
-            "For file: " + properties.getProperty(JDBC_JCEKS_FILE) +
-            "\nFor key: " + properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY), e);
+        LOGGER.error("Failed to retrieve password from JCEKS \n" +
+            "For file: {} \nFor key: {}", properties.getProperty(JDBC_JCEKS_FILE),
+                properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY), e);
         throw e;
       }
     }
@@ -661,7 +659,16 @@ public class JDBCInterpreter extends KerberosInterpreter {
     return sqlSplitter.splitSql(text);
   }
 
-  private InterpreterResult executeSql(String propertyKey, String sql,
+  /**
+   * Execute the sql statement under this dbPrefix.
+   *
+   * @param dbPrefix
+   * @param sql
+   * @param context
+   * @return
+   * @throws InterpreterException
+   */
+  private InterpreterResult executeSql(String dbPrefix, String sql,
       InterpreterContext context) throws InterpreterException {
     Connection connection = null;
     Statement statement;
@@ -670,13 +677,13 @@ public class JDBCInterpreter extends KerberosInterpreter {
     String user = context.getAuthenticationInfo().getUser();
 
     try {
-      connection = getConnection(propertyKey, context);
+      connection = getConnection(dbPrefix, context);
     } catch (Exception e) {
       String errorMsg = ExceptionUtils.getStackTrace(e);
       try {
-        closeDBPool(user, propertyKey);
+        closeDBPool(user, dbPrefix);
       } catch (SQLException e1) {
-        logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1);
+        LOGGER.error("Cannot close DBPool for user, dbPrefix: " + user + dbPrefix, e1);
       }
       try {
         context.out.write(errorMsg);
@@ -706,20 +713,20 @@ public class JDBCInterpreter extends KerberosInterpreter {
           getJDBCConfiguration(user).saveStatement(paragraphId, statement);
 
           String statementPrecode =
-              getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, propertyKey));
+              getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, dbPrefix));
 
           if (StringUtils.isNotBlank(statementPrecode)) {
             statement.execute(statementPrecode);
           }
 
           // start hive monitor thread if it is hive jdbc
-          if (getJDBCConfiguration(user).getPropertyMap(propertyKey).getProperty(URL_KEY)
+          if (getJDBCConfiguration(user).getPropertyMap(dbPrefix).getProperty(URL_KEY)
                   .startsWith("jdbc:hive2://")) {
             HiveUtils.startHiveMonitorThread(statement, context,
                     Boolean.parseBoolean(getProperty("hive.log.display", "true")));
           }
           boolean isResultSetAvailable = statement.execute(sqlToExecute);
-          getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(propertyKey);
+          getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix);
           if (isResultSetAvailable) {
             resultSet = statement.getResultSet();
 
@@ -771,7 +778,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
         }
       }
     } catch (Throwable e) {
-      logger.error("Cannot run " + sql, e);
+      LOGGER.error("Cannot run " + sql, e);
       return new InterpreterResult(Code.ERROR,  ExceptionUtils.getStackTrace(e));
     } finally {
       //In case user ran an insert/update/upsert statement
@@ -841,11 +848,11 @@ public class JDBCInterpreter extends KerberosInterpreter {
   @Override
   public InterpreterResult internalInterpret(String cmd, InterpreterContext context)
           throws InterpreterException {
-    logger.debug("Run SQL command '{}'", cmd);
-    String propertyKey = getPropertyKey(context);
-    logger.debug("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
+    LOGGER.debug("Run SQL command '{}'", cmd);
+    String dbPrefix = getDBPrefix(context);
+    LOGGER.debug("DBPrefix: {}, SQL command: '{}'", dbPrefix, cmd);
     if (!isRefreshMode(context)) {
-      return executeSql(propertyKey, cmd.trim(), context);
+      return executeSql(dbPrefix, cmd.trim(), context);
     } else {
       int refreshInterval = Integer.parseInt(context.getLocalProperties().get("refreshInterval"));
       final String code = cmd.trim();
@@ -857,14 +864,14 @@ public class JDBCInterpreter extends KerberosInterpreter {
       refreshExecutor.scheduleAtFixedRate(() -> {
         context.out.clear(false);
         try {
-          InterpreterResult result = executeSql(propertyKey, code, context);
+          InterpreterResult result = executeSql(dbPrefix, code, context);
           context.out.flush();
           interpreterResultRef.set(result);
           if (result.code() != Code.SUCCESS) {
             refreshExecutor.shutdownNow();
           }
         } catch (Exception e) {
-          logger.warn("Fail to run sql", e);
+          LOGGER.warn("Fail to run sql", e);
         }
       }, 0, refreshInterval, TimeUnit.MILLISECONDS);
 
@@ -872,7 +879,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
         try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
-          logger.error("");
+          LOGGER.error("");
         }
       }
       refreshExecutorServices.remove(context.getParagraphId());
@@ -890,7 +897,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
   public void cancel(InterpreterContext context) {
 
     if (isRefreshMode(context)) {
-      logger.info("Shutdown refreshExecutorService for paragraph: " + context.getParagraphId());
+      LOGGER.info("Shutdown refreshExecutorService for paragraph: {}", context.getParagraphId());
       ScheduledExecutorService executorService =
               refreshExecutorServices.get(context.getParagraphId());
       if (executorService != null) {
@@ -900,19 +907,25 @@ public class JDBCInterpreter extends KerberosInterpreter {
       return;
     }
 
-    logger.info("Cancel current query statement.");
+    LOGGER.info("Cancel current query statement.");
     String paragraphId = context.getParagraphId();
     JDBCUserConfigurations jdbcUserConfigurations =
             getJDBCConfiguration(context.getAuthenticationInfo().getUser());
     try {
       jdbcUserConfigurations.cancelStatement(paragraphId);
     } catch (SQLException e) {
-      logger.error("Error while cancelling...", e);
+      LOGGER.error("Error while cancelling...", e);
     }
   }
 
-  public String getPropertyKey(InterpreterContext interpreterContext) {
-    Map<String, String> localProperties = interpreterContext.getLocalProperties();
+  /**
+   *
+   *
+   * @param context
+   * @return
+   */
+  public String getDBPrefix(InterpreterContext context) {
+    Map<String, String> localProperties = context.getLocalProperties();
     // It is recommended to use this kind of format: %jdbc(db=mysql)
     if (localProperties.containsKey("db")) {
       return localProperties.get("db");
@@ -949,7 +962,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
   public List<InterpreterCompletion> completion(String buf, int cursor,
       InterpreterContext interpreterContext) throws InterpreterException {
     List<InterpreterCompletion> candidates = new ArrayList<>();
-    String propertyKey = getPropertyKey(interpreterContext);
+    String propertyKey = getDBPrefix(interpreterContext);
     String sqlCompleterKey =
         String.format("%s.%s", interpreterContext.getAuthenticationInfo().getUser(), propertyKey);
     SqlCompleter sqlCompleter = sqlCompletersMap.get(sqlCompleterKey);
@@ -960,7 +973,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
         connection = getConnection(propertyKey, interpreterContext);
       }
     } catch (ClassNotFoundException | SQLException | IOException e) {
-      logger.warn("SQLCompleter will created without use connection");
+      LOGGER.warn("SQLCompleter will created without use connection");
     }
 
     sqlCompleter = createOrUpdateSqlCompleter(sqlCompleter, connection, propertyKey, buf, cursor);
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java
index 4eac9fc..223b85b 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java
@@ -29,8 +29,11 @@ import org.apache.zeppelin.user.UsernamePassword;
  */
 public class JDBCUserConfigurations {
   private final Map<String, Statement> paragraphIdStatementMap;
+  // dbPrefix --> PoolingDriver
   private final Map<String, PoolingDriver> poolingDriverMap;
+  // dbPrefix --> Properties
   private final HashMap<String, Properties> propertiesMap;
+  // dbPrefix --> Boolean
   private HashMap<String, Boolean> isSuccessful;
 
   public JDBCUserConfigurations() {
@@ -52,40 +55,40 @@ public class JDBCUserConfigurations {
     isSuccessful.clear();
   }
 
-  public void setPropertyMap(String key, Properties properties) {
+  public void setPropertyMap(String dbPrefix, Properties properties) {
     Properties p = (Properties) properties.clone();
-    propertiesMap.put(key, p);
+    propertiesMap.put(dbPrefix, p);
   }
 
   public Properties getPropertyMap(String key) {
     return propertiesMap.get(key);
   }
 
-  public void cleanUserProperty(String propertyKey) {
-    propertiesMap.get(propertyKey).remove("user");
-    propertiesMap.get(propertyKey).remove("password");
+  public void cleanUserProperty(String dfPrefix) {
+    propertiesMap.get(dfPrefix).remove("user");
+    propertiesMap.get(dfPrefix).remove("password");
   }
 
-  public void setUserProperty(String propertyKey, UsernamePassword usernamePassword) {
-    propertiesMap.get(propertyKey).setProperty("user", usernamePassword.getUsername());
-    propertiesMap.get(propertyKey).setProperty("password", usernamePassword.getPassword());
+  public void setUserProperty(String dbPrefix, UsernamePassword usernamePassword) {
+    propertiesMap.get(dbPrefix).setProperty("user", usernamePassword.getUsername());
+    propertiesMap.get(dbPrefix).setProperty("password", usernamePassword.getPassword());
   }
 
-  public void saveStatement(String key, Statement statement) throws SQLException {
-    paragraphIdStatementMap.put(key, statement);
+  public void saveStatement(String paragraphId, Statement statement) throws SQLException {
+    paragraphIdStatementMap.put(paragraphId, statement);
   }
 
-  public void cancelStatement(String key) throws SQLException {
-    paragraphIdStatementMap.get(key).cancel();
+  public void cancelStatement(String paragraphId) throws SQLException {
+    paragraphIdStatementMap.get(paragraphId).cancel();
   }
 
-  public void removeStatement(String key) {
-    paragraphIdStatementMap.remove(key);
+  public void removeStatement(String paragraphId) {
+    paragraphIdStatementMap.remove(paragraphId);
   }
 
-  public void saveDBDriverPool(String key, PoolingDriver driver) throws SQLException {
-    poolingDriverMap.put(key, driver);
-    isSuccessful.put(key, false);
+  public void saveDBDriverPool(String dbPrefix, PoolingDriver driver) throws SQLException {
+    poolingDriverMap.put(dbPrefix, driver);
+    isSuccessful.put(dbPrefix, false);
   }
   public PoolingDriver removeDBDriverPool(String key) throws SQLException {
     isSuccessful.remove(key);
@@ -96,14 +99,7 @@ public class JDBCUserConfigurations {
     return poolingDriverMap.containsKey(key);
   }
 
-  public void setConnectionInDBDriverPoolSuccessful(String key) {
-    isSuccessful.put(key, true);
-  }
-
-  public boolean isConnectionInDBDriverPoolSuccessful(String key) {
-    if (isSuccessful.containsKey(key)) {
-      return isSuccessful.get(key);
-    }
-    return false;
+  public void setConnectionInDBDriverPoolSuccessful(String dbPrefix) {
+    isSuccessful.put(dbPrefix, true);
   }
 }
diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json
index c84056f..5aac46e 100644
--- a/jdbc/src/main/resources/interpreter-setting.json
+++ b/jdbc/src/main/resources/interpreter-setting.json
@@ -25,13 +25,6 @@
         "description": "The JDBC user password",
         "type": "password"
       },
-      "default.completer.ttlInSeconds": {
-        "envName": null,
-        "propertyName": "default.completer.ttlInSeconds",
-        "defaultValue": "120",
-        "description": "Time to live sql completer in seconds (-1 to update everytime, 0 to disable update)",
-        "type": "number"
-      },
       "default.driver": {
         "envName": null,
         "propertyName": "default.driver",
@@ -39,6 +32,13 @@
         "description": "JDBC Driver Name",
         "type": "string"
       },
+      "default.completer.ttlInSeconds": {
+        "envName": null,
+        "propertyName": "default.completer.ttlInSeconds",
+        "defaultValue": "120",
+        "description": "Time to live sql completer in seconds (-1 to update everytime, 0 to disable update)",
+        "type": "number"
+      },
       "default.completer.schemaFilters": {
         "envName": null,
         "propertyName": "default.completer.schemaFilters",
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
index f6fe108..92e0c24 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
@@ -14,11 +14,12 @@
  */
 package org.apache.zeppelin.jdbc;
 
-import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
-import net.jodah.concurrentunit.Waiter;
+
 import org.apache.zeppelin.completer.CompletionType;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
@@ -45,6 +46,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
+import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
+import net.jodah.concurrentunit.Waiter;
+
 import static java.lang.String.format;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.COMMON_MAX_LINE;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_DRIVER;
@@ -55,12 +59,13 @@ import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.PRECODE_KEY_TEMPLATE;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.STATEMENT_PRECODE_KEY_TEMPLATE;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+
 /**
  * JDBC interpreter unit tests.
  */
@@ -116,21 +121,21 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     InterpreterContext interpreterContext = InterpreterContext.builder()
         .setLocalProperties(localProperties)
         .build();
-    assertEquals(JDBCInterpreter.DEFAULT_KEY, t.getPropertyKey(interpreterContext));
+    assertEquals(JDBCInterpreter.DEFAULT_KEY, t.getDBPrefix(interpreterContext));
 
     localProperties = new HashMap<>();
     localProperties.put("db", "mysql");
     interpreterContext = InterpreterContext.builder()
         .setLocalProperties(localProperties)
         .build();
-    assertEquals("mysql", t.getPropertyKey(interpreterContext));
+    assertEquals("mysql", t.getDBPrefix(interpreterContext));
 
     localProperties = new HashMap<>();
     localProperties.put("hive", "hive");
     interpreterContext = InterpreterContext.builder()
         .setLocalProperties(localProperties)
         .build();
-    assertEquals("hive", t.getPropertyKey(interpreterContext));
+    assertEquals("hive", t.getDBPrefix(interpreterContext));
   }
 
   @Test
@@ -270,7 +275,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
-    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+    assertEquals(interpreterResult.toString(),
+            InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
     assertEquals("SOME_OTHER_NAME\na_name\n", resultMessages.get(0).getData());
   }
@@ -491,17 +497,21 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     assertEquals(true, completionList.contains(correctCompletionKeyword));
   }
 
-  private Properties getDBProperty(String dbUser,
+  private Properties getDBProperty(String dbPrefix,
+                                   String dbUser,
                                    String dbPassowrd) throws IOException {
     Properties properties = new Properties();
     properties.setProperty("common.max_count", "1000");
     properties.setProperty("common.max_retry", "3");
-    properties.setProperty("default.driver", "org.h2.Driver");
-    properties.setProperty("default.url", getJdbcConnection());
-    if (dbUser != null) {
+    if (!StringUtils.isBlank(dbPrefix)) {
+      properties.setProperty(dbPrefix + ".driver", "org.h2.Driver");
+      properties.setProperty(dbPrefix + ".url", getJdbcConnection());
+      properties.setProperty(dbPrefix + ".user", dbUser);
+      properties.setProperty(dbPrefix + ".password", dbPassowrd);
+    } else {
+      properties.setProperty("default.driver", "org.h2.Driver");
+      properties.setProperty("default.url", getJdbcConnection());
       properties.setProperty("default.user", dbUser);
-    }
-    if (dbPassowrd != null) {
       properties.setProperty("default.password", dbPassowrd);
     }
     return properties;
@@ -521,75 +531,95 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
   }
 
   @Test
-  public void testMultiTenant() throws IOException, InterpreterException {
-    /*
-     * assume that the database user is 'dbuser' and password is 'dbpassword'
-     * 'jdbc1' interpreter has user('dbuser')/password('dbpassword') property
-     * 'jdbc2' interpreter doesn't have user/password property
-     * 'user1' doesn't have Credential information.
-     * 'user2' has 'jdbc2' Credential information that is 'user2Id' / 'user2Pw' as id and password
-     */
-
-    JDBCInterpreter jdbc1 = new JDBCInterpreter(getDBProperty("dbuser", "dbpassword"));
-    JDBCInterpreter jdbc2 = new JDBCInterpreter(getDBProperty("", ""));
-
+  public void testMultiTenant_1() throws IOException, InterpreterException {
+    // user1 %jdbc  select from default db
+    // user2 %jdbc  select from default db
+    // user2 %jdbc  select from from hive db
+    Properties properties = getDBProperty("default", "dbuser", "dbpassword");
+    properties.putAll(getDBProperty("hive", "", ""));
+
+    JDBCInterpreter jdbc = new JDBCInterpreter(properties);
     AuthenticationInfo user1Credential = getUserAuth("user1", null, null, null);
-    AuthenticationInfo user2Credential = getUserAuth("user2", "jdbc.jdbc2", "user2Id", "user2Pw");
+    AuthenticationInfo user2Credential = getUserAuth("user2", "hive", "user2Id", "user2Pw");
+    jdbc.open();
 
-    // user1 runs jdbc1
-    jdbc1.open();
-    InterpreterContext ctx1 = InterpreterContext.builder()
-        .setAuthenticationInfo(user1Credential)
-        .setInterpreterOut(new InterpreterOutput(null))
-        .setReplName("jdbc1")
-        .build();
-    jdbc1.interpret("", ctx1);
+    // user1 runs default
+    InterpreterContext context = InterpreterContext.builder()
+            .setAuthenticationInfo(user1Credential)
+            .setInterpreterOut(new InterpreterOutput(null))
+            .setReplName("jdbc")
+            .build();
+    jdbc.interpret("", context);
 
-    JDBCUserConfigurations user1JDBC1Conf = jdbc1.getJDBCConfiguration("user1");
+    JDBCUserConfigurations user1JDBC1Conf = jdbc.getJDBCConfiguration("user1");
     assertEquals("dbuser", user1JDBC1Conf.getPropertyMap("default").get("user"));
     assertEquals("dbpassword", user1JDBC1Conf.getPropertyMap("default").get("password"));
-    jdbc1.close();
-
-    // user1 runs jdbc2
-    jdbc2.open();
-    InterpreterContext ctx2 = InterpreterContext.builder()
-        .setAuthenticationInfo(user1Credential)
-        .setReplName("jdbc2")
-        .build();
-    jdbc2.interpret("", ctx2);
-
-    JDBCUserConfigurations user1JDBC2Conf = jdbc2.getJDBCConfiguration("user1");
-    assertNull(user1JDBC2Conf.getPropertyMap("default").get("user"));
-    assertNull(user1JDBC2Conf.getPropertyMap("default").get("password"));
-    jdbc2.close();
 
-    // user2 runs jdbc1
-    jdbc1.open();
-    InterpreterContext ctx3 = InterpreterContext.builder()
+    // user2 run default
+    context = InterpreterContext.builder()
         .setAuthenticationInfo(user2Credential)
         .setInterpreterOut(new InterpreterOutput(null))
-        .setReplName("jdbc1")
+        .setReplName("jdbc")
         .build();
-    jdbc1.interpret("", ctx3);
+    jdbc.interpret("", context);
 
-    JDBCUserConfigurations user2JDBC1Conf = jdbc1.getJDBCConfiguration("user2");
+    JDBCUserConfigurations user2JDBC1Conf = jdbc.getJDBCConfiguration("user2");
     assertEquals("dbuser", user2JDBC1Conf.getPropertyMap("default").get("user"));
     assertEquals("dbpassword", user2JDBC1Conf.getPropertyMap("default").get("password"));
-    jdbc1.close();
 
-    // user2 runs jdbc2
-    jdbc2.open();
-    InterpreterContext ctx4 = InterpreterContext.builder()
-        .setAuthenticationInfo(user2Credential)
-        .setInterpreterOut(new InterpreterOutput(null))
-        .setReplName("jdbc2")
-        .build();
-    jdbc2.interpret("", ctx4);
+    // user2 run hive
+    Map<String, String> localProperties = new HashMap<>();
+    localProperties.put("db", "hive");
+    context = InterpreterContext.builder()
+            .setAuthenticationInfo(user2Credential)
+            .setInterpreterOut(new InterpreterOutput(null))
+            .setLocalProperties(localProperties)
+            .setReplName("jdbc")
+            .build();
+    jdbc.interpret("", context);
+
+    user2JDBC1Conf = jdbc.getJDBCConfiguration("user2");
+    assertEquals("user2Id", user2JDBC1Conf.getPropertyMap("hive").get("user"));
+    assertEquals("user2Pw", user2JDBC1Conf.getPropertyMap("hive").get("password"));
+
+    jdbc.close();
+  }
+
+  @Test
+  public void testMultiTenant_2() throws IOException, InterpreterException {
+    // user1 %hive  select from default db
+    // user2 %hive  select from default db
+    Properties properties = getDBProperty("default", "", "");
+    JDBCInterpreter jdbc = new JDBCInterpreter(properties);
+    AuthenticationInfo user1Credential = getUserAuth("user1", "hive", "user1Id", "user1Pw");
+    AuthenticationInfo user2Credential = getUserAuth("user2", "hive", "user2Id", "user2Pw");
+    jdbc.open();
+
+    // user1 runs default
+    InterpreterContext context = InterpreterContext.builder()
+            .setAuthenticationInfo(user1Credential)
+            .setInterpreterOut(new InterpreterOutput(null))
+            .setReplName("hive")
+            .build();
+    jdbc.interpret("", context);
+
+    JDBCUserConfigurations user1JDBC1Conf = jdbc.getJDBCConfiguration("user1");
+    assertEquals("user1Id", user1JDBC1Conf.getPropertyMap("default").get("user"));
+    assertEquals("user1Pw", user1JDBC1Conf.getPropertyMap("default").get("password"));
+
+    // user2 run default
+    context = InterpreterContext.builder()
+            .setAuthenticationInfo(user2Credential)
+            .setInterpreterOut(new InterpreterOutput(null))
+            .setReplName("hive")
+            .build();
+    jdbc.interpret("", context);
+
+    JDBCUserConfigurations user2JDBC1Conf = jdbc.getJDBCConfiguration("user2");
+    assertEquals("user2Id", user2JDBC1Conf.getPropertyMap("default").get("user"));
+    assertEquals("user2Pw", user2JDBC1Conf.getPropertyMap("default").get("password"));
 
-    JDBCUserConfigurations user2JDBC2Conf = jdbc2.getJDBCConfiguration("user2");
-    assertEquals("user2Id", user2JDBC2Conf.getPropertyMap("default").get("user"));
-    assertEquals("user2Pw", user2JDBC2Conf.getPropertyMap("default").get("password"));
-    jdbc2.close();
+    jdbc.close();
   }
 
   @Test
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 30d3330..e8c02ac 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -501,10 +501,13 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   private InterpreterContext getInterpreterContext() {
     AngularObjectRegistry registry = null;
     ResourcePool resourcePool = null;
-
+    String replName = null;
     if (this.interpreter != null) {
       registry = this.interpreter.getInterpreterGroup().getAngularObjectRegistry();
       resourcePool = this.interpreter.getInterpreterGroup().getResourcePool();
+      InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup)
+              interpreter.getInterpreterGroup()).getInterpreterSetting();
+      replName = interpreterSetting.getName();
     }
 
     Credentials credentials = note.getCredentials();
@@ -523,7 +526,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
             .setNoteId(note.getId())
             .setNoteName(note.getName())
             .setParagraphId(getId())
-            .setReplName(intpText)
+            .setReplName(replName)
             .setParagraphTitle(title)
             .setParagraphText(text)
             .setAuthenticationInfo(subject)