You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/30 15:33:18 UTC
[zeppelin] branch master updated: [ZEPPELIN-4863]. User credential
usage in jdbc interpreter doesn't work when the interpreter name is not
jdbc
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 55c1d27 [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc
55c1d27 is described below
commit 55c1d276746b9bbf5458ca92cb55b2aeb0bae595
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jun 8 10:02:45 2020 +0800
[ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc
### What is this PR for?
User credential only works in the default jdbc interpreter. If user create a new jdbc interpreter, e.g. hive, then credential won't work. This PR fix this and also do some code refactoring to make the jdbc interpreter module more readable.
### What type of PR is it?
[Bug Fix | Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4863
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3806 from zjffdu/ZEPPELIN-4863 and squashes the following commits:
9fd10eeb2 [Jeff Zhang] [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc
abb00b376 [Jeff Zhang] [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc
---
.../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 193 +++++++++++----------
.../zeppelin/jdbc/JDBCUserConfigurations.java | 48 +++--
jdbc/src/main/resources/interpreter-setting.json | 14 +-
.../apache/zeppelin/jdbc/JDBCInterpreterTest.java | 166 ++++++++++--------
.../org/apache/zeppelin/notebook/Paragraph.java | 7 +-
5 files changed, 235 insertions(+), 193 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index bdda696..5dcf4d0 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -97,7 +97,7 @@ import org.apache.zeppelin.user.UsernamePassword;
* </p>
*/
public class JDBCInterpreter extends KerberosInterpreter {
- private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(JDBCInterpreter.class);
static final String INTERPRETER_NAME = "jdbc";
static final String COMMON_KEY = "common";
@@ -152,7 +152,9 @@ public class JDBCInterpreter extends KerberosInterpreter {
"KerberosConfigPath", "KerberosKeytabPath", "KerberosCredentialCachePath",
"extraCredentials", "roles", "sessionProperties"));
+ // database --> Properties
private final HashMap<String, Properties> basePropertiesMap;
+ // username --> User Configuration
private final HashMap<String, JDBCUserConfigurations> jdbcUserConfigurationsMap;
private final HashMap<String, SqlCompleter> sqlCompletersMap;
@@ -189,23 +191,19 @@ public class JDBCInterpreter extends KerberosInterpreter {
return true;
}
} catch (Exception e) {
- logger.error("Unable to run kinit for zeppelin", e);
+ LOGGER.error("Unable to run kinit for zeppelin", e);
}
return false;
}
- public HashMap<String, Properties> getPropertiesMap() {
- return basePropertiesMap;
- }
-
@Override
public void open() {
super.open();
for (String propertyKey : properties.stringPropertyNames()) {
- logger.debug("propertyKey: {}", propertyKey);
+ LOGGER.debug("propertyKey: {}", propertyKey);
String[] keyValue = propertyKey.split("\\.", 2);
if (2 == keyValue.length) {
- logger.debug("key: {}, value: {}", keyValue[0], keyValue[1]);
+ LOGGER.debug("key: {}, value: {}", keyValue[0], keyValue[1]);
Properties prefixProperties;
if (basePropertiesMap.containsKey(keyValue[0])) {
@@ -223,7 +221,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
if (!COMMON_KEY.equals(key)) {
Properties properties = basePropertiesMap.get(key);
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
- logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
+ LOGGER.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
key, DRIVER_KEY, key, key, URL_KEY);
removeKeySet.add(key);
}
@@ -233,7 +231,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
for (String key : removeKeySet) {
basePropertiesMap.remove(key);
}
- logger.debug("JDBC PropretiesMap: {}", basePropertiesMap);
+ LOGGER.debug("JDBC PropertiesMap: {}", basePropertiesMap);
setMaxLineResults();
setMaxRows();
@@ -295,12 +293,12 @@ public class JDBCInterpreter extends KerberosInterpreter {
// protection to release connection
executorService.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- logger.warn("Completion timeout", e);
+ LOGGER.warn("Completion timeout", e);
if (connection != null) {
try {
connection.close();
} catch (SQLException e1) {
- logger.warn("Error close connection", e1);
+ LOGGER.warn("Error close connection", e1);
}
}
}
@@ -312,7 +310,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
try {
configurations.initStatementMap();
} catch (Exception e) {
- logger.error("Error while closing paragraphIdStatementMap statement...", e);
+ LOGGER.error("Error while closing paragraphIdStatementMap statement...", e);
}
}
}
@@ -322,13 +320,13 @@ public class JDBCInterpreter extends KerberosInterpreter {
try {
closeDBPool(key, DEFAULT_KEY);
} catch (SQLException e) {
- logger.error("Error while closing database pool.", e);
+ LOGGER.error("Error while closing database pool.", e);
}
try {
JDBCUserConfigurations configurations = jdbcUserConfigurationsMap.get(key);
configurations.initConnectionPoolMap();
} catch (SQLException e) {
- logger.error("Error while closing initConnectionPoolMap.", e);
+ LOGGER.error("Error while closing initConnectionPoolMap.", e);
}
}
}
@@ -340,22 +338,22 @@ public class JDBCInterpreter extends KerberosInterpreter {
initStatementMap();
initConnectionPoolMap();
} catch (Exception e) {
- logger.error("Error while closing...", e);
+ LOGGER.error("Error while closing...", e);
}
}
- private String getEntityName(String replName) {
- StringBuffer entityName = new StringBuffer();
- entityName.append(INTERPRETER_NAME);
- entityName.append(".");
- entityName.append(replName);
- return entityName.toString();
+ private String getEntityName(String replName, String propertyKey) {
+ if ("jdbc".equals(replName)) {
+ return propertyKey;
+ } else {
+ return replName;
+ }
}
- private String getJDBCDriverName(String user, String propertyKey) {
+ private String getJDBCDriverName(String user, String dbPrefix) {
StringBuffer driverName = new StringBuffer();
driverName.append(DBCP_STRING);
- driverName.append(propertyKey);
+ driverName.append(dbPrefix);
driverName.append(user);
return driverName.toString();
}
@@ -367,10 +365,10 @@ public class JDBCInterpreter extends KerberosInterpreter {
}
private UsernamePassword getUsernamePassword(InterpreterContext interpreterContext,
- String replName) {
+ String entity) {
UserCredentials uc = interpreterContext.getAuthenticationInfo().getUserCredentials();
if (uc != null) {
- return uc.getUsernamePassword(replName);
+ return uc.getUsernamePassword(entity);
}
return null;
}
@@ -393,36 +391,36 @@ public class JDBCInterpreter extends KerberosInterpreter {
}
}
- private void setUserProperty(String propertyKey, InterpreterContext interpreterContext)
+ private void setUserProperty(String dbPrefix, InterpreterContext context)
throws SQLException, IOException, InterpreterException {
- String user = interpreterContext.getAuthenticationInfo().getUser();
+ String user = context.getAuthenticationInfo().getUser();
JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user);
- if (basePropertiesMap.get(propertyKey).containsKey(USER_KEY) &&
- !basePropertiesMap.get(propertyKey).getProperty(USER_KEY).isEmpty()) {
- String password = getPassword(basePropertiesMap.get(propertyKey));
+ if (basePropertiesMap.get(dbPrefix).containsKey(USER_KEY) &&
+ !basePropertiesMap.get(dbPrefix).getProperty(USER_KEY).isEmpty()) {
+ String password = getPassword(basePropertiesMap.get(dbPrefix));
if (!isEmpty(password)) {
- basePropertiesMap.get(propertyKey).setProperty(PASSWORD_KEY, password);
+ basePropertiesMap.get(dbPrefix).setProperty(PASSWORD_KEY, password);
}
}
- jdbcUserConfigurations.setPropertyMap(propertyKey, basePropertiesMap.get(propertyKey));
- if (existAccountInBaseProperty(propertyKey)) {
+ jdbcUserConfigurations.setPropertyMap(dbPrefix, basePropertiesMap.get(dbPrefix));
+ if (existAccountInBaseProperty(dbPrefix)) {
return;
}
- jdbcUserConfigurations.cleanUserProperty(propertyKey);
+ jdbcUserConfigurations.cleanUserProperty(dbPrefix);
- UsernamePassword usernamePassword = getUsernamePassword(interpreterContext,
- getEntityName(interpreterContext.getReplName()));
+ UsernamePassword usernamePassword = getUsernamePassword(context,
+ getEntityName(context.getReplName(), dbPrefix));
if (usernamePassword != null) {
- jdbcUserConfigurations.setUserProperty(propertyKey, usernamePassword);
+ jdbcUserConfigurations.setUserProperty(dbPrefix, usernamePassword);
} else {
- closeDBPool(user, propertyKey);
+ closeDBPool(user, dbPrefix);
}
}
- private void createConnectionPool(String url, String user, String propertyKey,
- Properties properties) throws SQLException, ClassNotFoundException, IOException {
+ private void createConnectionPool(String url, String user, String dbPrefix,
+ Properties properties) throws SQLException, ClassNotFoundException {
String driverClass = properties.getProperty(DRIVER_KEY);
if (driverClass != null && (driverClass.equals("com.facebook.presto.jdbc.PrestoDriver")
@@ -449,62 +447,62 @@ public class JDBCInterpreter extends KerberosInterpreter {
poolableConnectionFactory.setPool(connectionPool);
Class.forName(driverClass);
PoolingDriver driver = new PoolingDriver();
- driver.registerPool(propertyKey + user, connectionPool);
- getJDBCConfiguration(user).saveDBDriverPool(propertyKey, driver);
+ driver.registerPool(dbPrefix + user, connectionPool);
+ getJDBCConfiguration(user).saveDBDriverPool(dbPrefix, driver);
}
- private Connection getConnectionFromPool(String url, String user, String propertyKey,
- Properties properties) throws SQLException, ClassNotFoundException, IOException {
- String jdbcDriver = getJDBCDriverName(user, propertyKey);
+ private Connection getConnectionFromPool(String url, String user, String dbPrefix,
+ Properties properties) throws SQLException, ClassNotFoundException {
+ String jdbcDriver = getJDBCDriverName(user, dbPrefix);
- if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) {
- createConnectionPool(url, user, propertyKey, properties);
+ if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(dbPrefix)) {
+ createConnectionPool(url, user, dbPrefix, properties);
}
return DriverManager.getConnection(jdbcDriver);
}
- public Connection getConnection(String propertyKey, InterpreterContext interpreterContext)
+ public Connection getConnection(String dbPrefix, InterpreterContext context)
throws ClassNotFoundException, SQLException, InterpreterException, IOException {
- final String user = interpreterContext.getAuthenticationInfo().getUser();
+ final String user = context.getAuthenticationInfo().getUser();
Connection connection;
- if (propertyKey == null || basePropertiesMap.get(propertyKey) == null) {
+ if (dbPrefix == null || basePropertiesMap.get(dbPrefix) == null) {
return null;
}
JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user);
- setUserProperty(propertyKey, interpreterContext);
+ setUserProperty(dbPrefix, context);
- final Properties properties = jdbcUserConfigurations.getPropertyMap(propertyKey);
+ final Properties properties = jdbcUserConfigurations.getPropertyMap(dbPrefix);
final String url = properties.getProperty(URL_KEY);
if (isEmpty(getProperty("zeppelin.jdbc.auth.type"))) {
- connection = getConnectionFromPool(url, user, propertyKey, properties);
+ connection = getConnectionFromPool(url, user, dbPrefix, properties);
} else {
UserGroupInformation.AuthenticationMethod authType =
JDBCSecurityImpl.getAuthtype(getProperties());
- final String connectionUrl = appendProxyUserToURL(url, user, propertyKey);
+ final String connectionUrl = appendProxyUserToURL(url, user, dbPrefix);
JDBCSecurityImpl.createSecureConfiguration(getProperties(), authType);
switch (authType) {
case KERBEROS:
if (user == null || "false".equalsIgnoreCase(
getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) {
- connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
+ connection = getConnectionFromPool(connectionUrl, user, dbPrefix, properties);
} else {
- if (basePropertiesMap.get(propertyKey).containsKey("proxy.user.property")) {
- connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
+ if (basePropertiesMap.get(dbPrefix).containsKey("proxy.user.property")) {
+ connection = getConnectionFromPool(connectionUrl, user, dbPrefix, properties);
} else {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.createProxyUser(
user, UserGroupInformation.getCurrentUser());
} catch (Exception e) {
- logger.error("Error in getCurrentUser", e);
+ LOGGER.error("Error in getCurrentUser", e);
throw new InterpreterException("Error in getCurrentUser", e);
}
- final String poolKey = propertyKey;
+ final String poolKey = dbPrefix;
try {
connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
@Override
@@ -513,7 +511,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
}
});
} catch (Exception e) {
- logger.error("Error in doAs", e);
+ LOGGER.error("Error in doAs", e);
throw new InterpreterException("Error in doAs", e);
}
}
@@ -521,7 +519,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
break;
default:
- connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
+ connection = getConnectionFromPool(connectionUrl, user, dbPrefix, properties);
}
}
@@ -538,13 +536,13 @@ public class JDBCInterpreter extends KerberosInterpreter {
if (lastIndexOfUrl == -1) {
lastIndexOfUrl = connectionUrl.length();
}
- logger.info("Using proxy user as :" + user);
- logger.info("Using proxy property for user as :" +
+ LOGGER.info("Using proxy user as: {}", user);
+ LOGGER.info("Using proxy property for user as: {}",
basePropertiesMap.get(propertyKey).getProperty("proxy.user.property"));
connectionUrl.insert(lastIndexOfUrl, ";" +
basePropertiesMap.get(propertyKey).getProperty("proxy.user.property") + "=" + user + ";");
} else if (user != null && !user.equals("anonymous") && url.contains("hive")) {
- logger.warn("User impersonation for hive has changed please refer: http://zeppelin.apache" +
+ LOGGER.warn("User impersonation for hive has changed please refer: http://zeppelin.apache" +
".org/docs/latest/interpreter/jdbc.html#apache-hive");
}
@@ -570,9 +568,9 @@ public class JDBCInterpreter extends KerberosInterpreter {
+ properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY));
}
} catch (Exception e) {
- logger.error("Failed to retrieve password from JCEKS \n" +
- "For file: " + properties.getProperty(JDBC_JCEKS_FILE) +
- "\nFor key: " + properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY), e);
+ LOGGER.error("Failed to retrieve password from JCEKS \n" +
+ "For file: {} \nFor key: {}", properties.getProperty(JDBC_JCEKS_FILE),
+ properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY), e);
throw e;
}
}
@@ -661,7 +659,16 @@ public class JDBCInterpreter extends KerberosInterpreter {
return sqlSplitter.splitSql(text);
}
- private InterpreterResult executeSql(String propertyKey, String sql,
+ /**
+ * Execute the sql statement under this dbPrefix.
+ *
+ * @param dbPrefix
+ * @param sql
+ * @param context
+ * @return
+ * @throws InterpreterException
+ */
+ private InterpreterResult executeSql(String dbPrefix, String sql,
InterpreterContext context) throws InterpreterException {
Connection connection = null;
Statement statement;
@@ -670,13 +677,13 @@ public class JDBCInterpreter extends KerberosInterpreter {
String user = context.getAuthenticationInfo().getUser();
try {
- connection = getConnection(propertyKey, context);
+ connection = getConnection(dbPrefix, context);
} catch (Exception e) {
String errorMsg = ExceptionUtils.getStackTrace(e);
try {
- closeDBPool(user, propertyKey);
+ closeDBPool(user, dbPrefix);
} catch (SQLException e1) {
- logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1);
+ LOGGER.error("Cannot close DBPool for user, dbPrefix: " + user + dbPrefix, e1);
}
try {
context.out.write(errorMsg);
@@ -706,20 +713,20 @@ public class JDBCInterpreter extends KerberosInterpreter {
getJDBCConfiguration(user).saveStatement(paragraphId, statement);
String statementPrecode =
- getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, propertyKey));
+ getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, dbPrefix));
if (StringUtils.isNotBlank(statementPrecode)) {
statement.execute(statementPrecode);
}
// start hive monitor thread if it is hive jdbc
- if (getJDBCConfiguration(user).getPropertyMap(propertyKey).getProperty(URL_KEY)
+ if (getJDBCConfiguration(user).getPropertyMap(dbPrefix).getProperty(URL_KEY)
.startsWith("jdbc:hive2://")) {
HiveUtils.startHiveMonitorThread(statement, context,
Boolean.parseBoolean(getProperty("hive.log.display", "true")));
}
boolean isResultSetAvailable = statement.execute(sqlToExecute);
- getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(propertyKey);
+ getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix);
if (isResultSetAvailable) {
resultSet = statement.getResultSet();
@@ -771,7 +778,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
}
}
} catch (Throwable e) {
- logger.error("Cannot run " + sql, e);
+ LOGGER.error("Cannot run " + sql, e);
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
} finally {
//In case user ran an insert/update/upsert statement
@@ -841,11 +848,11 @@ public class JDBCInterpreter extends KerberosInterpreter {
@Override
public InterpreterResult internalInterpret(String cmd, InterpreterContext context)
throws InterpreterException {
- logger.debug("Run SQL command '{}'", cmd);
- String propertyKey = getPropertyKey(context);
- logger.debug("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
+ LOGGER.debug("Run SQL command '{}'", cmd);
+ String dbPrefix = getDBPrefix(context);
+ LOGGER.debug("DBPrefix: {}, SQL command: '{}'", dbPrefix, cmd);
if (!isRefreshMode(context)) {
- return executeSql(propertyKey, cmd.trim(), context);
+ return executeSql(dbPrefix, cmd.trim(), context);
} else {
int refreshInterval = Integer.parseInt(context.getLocalProperties().get("refreshInterval"));
final String code = cmd.trim();
@@ -857,14 +864,14 @@ public class JDBCInterpreter extends KerberosInterpreter {
refreshExecutor.scheduleAtFixedRate(() -> {
context.out.clear(false);
try {
- InterpreterResult result = executeSql(propertyKey, code, context);
+ InterpreterResult result = executeSql(dbPrefix, code, context);
context.out.flush();
interpreterResultRef.set(result);
if (result.code() != Code.SUCCESS) {
refreshExecutor.shutdownNow();
}
} catch (Exception e) {
- logger.warn("Fail to run sql", e);
+ LOGGER.warn("Fail to run sql", e);
}
}, 0, refreshInterval, TimeUnit.MILLISECONDS);
@@ -872,7 +879,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- logger.error("");
+ LOGGER.error("");
}
}
refreshExecutorServices.remove(context.getParagraphId());
@@ -890,7 +897,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
public void cancel(InterpreterContext context) {
if (isRefreshMode(context)) {
- logger.info("Shutdown refreshExecutorService for paragraph: " + context.getParagraphId());
+ LOGGER.info("Shutdown refreshExecutorService for paragraph: {}", context.getParagraphId());
ScheduledExecutorService executorService =
refreshExecutorServices.get(context.getParagraphId());
if (executorService != null) {
@@ -900,19 +907,25 @@ public class JDBCInterpreter extends KerberosInterpreter {
return;
}
- logger.info("Cancel current query statement.");
+ LOGGER.info("Cancel current query statement.");
String paragraphId = context.getParagraphId();
JDBCUserConfigurations jdbcUserConfigurations =
getJDBCConfiguration(context.getAuthenticationInfo().getUser());
try {
jdbcUserConfigurations.cancelStatement(paragraphId);
} catch (SQLException e) {
- logger.error("Error while cancelling...", e);
+ LOGGER.error("Error while cancelling...", e);
}
}
- public String getPropertyKey(InterpreterContext interpreterContext) {
- Map<String, String> localProperties = interpreterContext.getLocalProperties();
+ /**
+ *
+ *
+ * @param context
+ * @return
+ */
+ public String getDBPrefix(InterpreterContext context) {
+ Map<String, String> localProperties = context.getLocalProperties();
// It is recommended to use this kind of format: %jdbc(db=mysql)
if (localProperties.containsKey("db")) {
return localProperties.get("db");
@@ -949,7 +962,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) throws InterpreterException {
List<InterpreterCompletion> candidates = new ArrayList<>();
- String propertyKey = getPropertyKey(interpreterContext);
+ String propertyKey = getDBPrefix(interpreterContext);
String sqlCompleterKey =
String.format("%s.%s", interpreterContext.getAuthenticationInfo().getUser(), propertyKey);
SqlCompleter sqlCompleter = sqlCompletersMap.get(sqlCompleterKey);
@@ -960,7 +973,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
connection = getConnection(propertyKey, interpreterContext);
}
} catch (ClassNotFoundException | SQLException | IOException e) {
- logger.warn("SQLCompleter will created without use connection");
+ LOGGER.warn("SQLCompleter will created without use connection");
}
sqlCompleter = createOrUpdateSqlCompleter(sqlCompleter, connection, propertyKey, buf, cursor);
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java
index 4eac9fc..223b85b 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java
@@ -29,8 +29,11 @@ import org.apache.zeppelin.user.UsernamePassword;
*/
public class JDBCUserConfigurations {
private final Map<String, Statement> paragraphIdStatementMap;
+ // dbPrefix --> PoolingDriver
private final Map<String, PoolingDriver> poolingDriverMap;
+ // dbPrefix --> Properties
private final HashMap<String, Properties> propertiesMap;
+ // dbPrefix --> Boolean
private HashMap<String, Boolean> isSuccessful;
public JDBCUserConfigurations() {
@@ -52,40 +55,40 @@ public class JDBCUserConfigurations {
isSuccessful.clear();
}
- public void setPropertyMap(String key, Properties properties) {
+ public void setPropertyMap(String dbPrefix, Properties properties) {
Properties p = (Properties) properties.clone();
- propertiesMap.put(key, p);
+ propertiesMap.put(dbPrefix, p);
}
public Properties getPropertyMap(String key) {
return propertiesMap.get(key);
}
- public void cleanUserProperty(String propertyKey) {
- propertiesMap.get(propertyKey).remove("user");
- propertiesMap.get(propertyKey).remove("password");
+ public void cleanUserProperty(String dfPrefix) {
+ propertiesMap.get(dfPrefix).remove("user");
+ propertiesMap.get(dfPrefix).remove("password");
}
- public void setUserProperty(String propertyKey, UsernamePassword usernamePassword) {
- propertiesMap.get(propertyKey).setProperty("user", usernamePassword.getUsername());
- propertiesMap.get(propertyKey).setProperty("password", usernamePassword.getPassword());
+ public void setUserProperty(String dbPrefix, UsernamePassword usernamePassword) {
+ propertiesMap.get(dbPrefix).setProperty("user", usernamePassword.getUsername());
+ propertiesMap.get(dbPrefix).setProperty("password", usernamePassword.getPassword());
}
- public void saveStatement(String key, Statement statement) throws SQLException {
- paragraphIdStatementMap.put(key, statement);
+ public void saveStatement(String paragraphId, Statement statement) throws SQLException {
+ paragraphIdStatementMap.put(paragraphId, statement);
}
- public void cancelStatement(String key) throws SQLException {
- paragraphIdStatementMap.get(key).cancel();
+ public void cancelStatement(String paragraphId) throws SQLException {
+ paragraphIdStatementMap.get(paragraphId).cancel();
}
- public void removeStatement(String key) {
- paragraphIdStatementMap.remove(key);
+ public void removeStatement(String paragraphId) {
+ paragraphIdStatementMap.remove(paragraphId);
}
- public void saveDBDriverPool(String key, PoolingDriver driver) throws SQLException {
- poolingDriverMap.put(key, driver);
- isSuccessful.put(key, false);
+ public void saveDBDriverPool(String dbPrefix, PoolingDriver driver) throws SQLException {
+ poolingDriverMap.put(dbPrefix, driver);
+ isSuccessful.put(dbPrefix, false);
}
public PoolingDriver removeDBDriverPool(String key) throws SQLException {
isSuccessful.remove(key);
@@ -96,14 +99,7 @@ public class JDBCUserConfigurations {
return poolingDriverMap.containsKey(key);
}
- public void setConnectionInDBDriverPoolSuccessful(String key) {
- isSuccessful.put(key, true);
- }
-
- public boolean isConnectionInDBDriverPoolSuccessful(String key) {
- if (isSuccessful.containsKey(key)) {
- return isSuccessful.get(key);
- }
- return false;
+ public void setConnectionInDBDriverPoolSuccessful(String dbPrefix) {
+ isSuccessful.put(dbPrefix, true);
}
}
diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json
index c84056f..5aac46e 100644
--- a/jdbc/src/main/resources/interpreter-setting.json
+++ b/jdbc/src/main/resources/interpreter-setting.json
@@ -25,13 +25,6 @@
"description": "The JDBC user password",
"type": "password"
},
- "default.completer.ttlInSeconds": {
- "envName": null,
- "propertyName": "default.completer.ttlInSeconds",
- "defaultValue": "120",
- "description": "Time to live sql completer in seconds (-1 to update everytime, 0 to disable update)",
- "type": "number"
- },
"default.driver": {
"envName": null,
"propertyName": "default.driver",
@@ -39,6 +32,13 @@
"description": "JDBC Driver Name",
"type": "string"
},
+ "default.completer.ttlInSeconds": {
+ "envName": null,
+ "propertyName": "default.completer.ttlInSeconds",
+ "defaultValue": "120",
+ "description": "Time to live sql completer in seconds (-1 to update everytime, 0 to disable update)",
+ "type": "number"
+ },
"default.completer.schemaFilters": {
"envName": null,
"propertyName": "default.completer.schemaFilters",
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
index f6fe108..92e0c24 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
@@ -14,11 +14,12 @@
*/
package org.apache.zeppelin.jdbc;
-import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
-import net.jodah.concurrentunit.Waiter;
+
import org.apache.zeppelin.completer.CompletionType;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
@@ -45,6 +46,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
+import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
+import net.jodah.concurrentunit.Waiter;
+
import static java.lang.String.format;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.COMMON_MAX_LINE;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_DRIVER;
@@ -55,12 +59,13 @@ import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.PRECODE_KEY_TEMPLATE;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.STATEMENT_PRECODE_KEY_TEMPLATE;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
/**
* JDBC interpreter unit tests.
*/
@@ -116,21 +121,21 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
InterpreterContext interpreterContext = InterpreterContext.builder()
.setLocalProperties(localProperties)
.build();
- assertEquals(JDBCInterpreter.DEFAULT_KEY, t.getPropertyKey(interpreterContext));
+ assertEquals(JDBCInterpreter.DEFAULT_KEY, t.getDBPrefix(interpreterContext));
localProperties = new HashMap<>();
localProperties.put("db", "mysql");
interpreterContext = InterpreterContext.builder()
.setLocalProperties(localProperties)
.build();
- assertEquals("mysql", t.getPropertyKey(interpreterContext));
+ assertEquals("mysql", t.getDBPrefix(interpreterContext));
localProperties = new HashMap<>();
localProperties.put("hive", "hive");
interpreterContext = InterpreterContext.builder()
.setLocalProperties(localProperties)
.build();
- assertEquals("hive", t.getPropertyKey(interpreterContext));
+ assertEquals("hive", t.getDBPrefix(interpreterContext));
}
@Test
@@ -270,7 +275,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
- assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+ assertEquals(interpreterResult.toString(),
+ InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
assertEquals("SOME_OTHER_NAME\na_name\n", resultMessages.get(0).getData());
}
@@ -491,17 +497,21 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
assertEquals(true, completionList.contains(correctCompletionKeyword));
}
- private Properties getDBProperty(String dbUser,
+ private Properties getDBProperty(String dbPrefix,
+ String dbUser,
String dbPassowrd) throws IOException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
- properties.setProperty("default.driver", "org.h2.Driver");
- properties.setProperty("default.url", getJdbcConnection());
- if (dbUser != null) {
+ if (!StringUtils.isBlank(dbPrefix)) {
+ properties.setProperty(dbPrefix + ".driver", "org.h2.Driver");
+ properties.setProperty(dbPrefix + ".url", getJdbcConnection());
+ properties.setProperty(dbPrefix + ".user", dbUser);
+ properties.setProperty(dbPrefix + ".password", dbPassowrd);
+ } else {
+ properties.setProperty("default.driver", "org.h2.Driver");
+ properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", dbUser);
- }
- if (dbPassowrd != null) {
properties.setProperty("default.password", dbPassowrd);
}
return properties;
@@ -521,75 +531,95 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
}
@Test
- public void testMultiTenant() throws IOException, InterpreterException {
- /*
- * assume that the database user is 'dbuser' and password is 'dbpassword'
- * 'jdbc1' interpreter has user('dbuser')/password('dbpassword') property
- * 'jdbc2' interpreter doesn't have user/password property
- * 'user1' doesn't have Credential information.
- * 'user2' has 'jdbc2' Credential information that is 'user2Id' / 'user2Pw' as id and password
- */
-
- JDBCInterpreter jdbc1 = new JDBCInterpreter(getDBProperty("dbuser", "dbpassword"));
- JDBCInterpreter jdbc2 = new JDBCInterpreter(getDBProperty("", ""));
-
+ public void testMultiTenant_1() throws IOException, InterpreterException {
+ // user1 %jdbc select from default db
+ // user2 %jdbc select from default db
+ // user2 %jdbc select from from hive db
+ Properties properties = getDBProperty("default", "dbuser", "dbpassword");
+ properties.putAll(getDBProperty("hive", "", ""));
+
+ JDBCInterpreter jdbc = new JDBCInterpreter(properties);
AuthenticationInfo user1Credential = getUserAuth("user1", null, null, null);
- AuthenticationInfo user2Credential = getUserAuth("user2", "jdbc.jdbc2", "user2Id", "user2Pw");
+ AuthenticationInfo user2Credential = getUserAuth("user2", "hive", "user2Id", "user2Pw");
+ jdbc.open();
- // user1 runs jdbc1
- jdbc1.open();
- InterpreterContext ctx1 = InterpreterContext.builder()
- .setAuthenticationInfo(user1Credential)
- .setInterpreterOut(new InterpreterOutput(null))
- .setReplName("jdbc1")
- .build();
- jdbc1.interpret("", ctx1);
+ // user1 runs default
+ InterpreterContext context = InterpreterContext.builder()
+ .setAuthenticationInfo(user1Credential)
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setReplName("jdbc")
+ .build();
+ jdbc.interpret("", context);
- JDBCUserConfigurations user1JDBC1Conf = jdbc1.getJDBCConfiguration("user1");
+ JDBCUserConfigurations user1JDBC1Conf = jdbc.getJDBCConfiguration("user1");
assertEquals("dbuser", user1JDBC1Conf.getPropertyMap("default").get("user"));
assertEquals("dbpassword", user1JDBC1Conf.getPropertyMap("default").get("password"));
- jdbc1.close();
-
- // user1 runs jdbc2
- jdbc2.open();
- InterpreterContext ctx2 = InterpreterContext.builder()
- .setAuthenticationInfo(user1Credential)
- .setReplName("jdbc2")
- .build();
- jdbc2.interpret("", ctx2);
-
- JDBCUserConfigurations user1JDBC2Conf = jdbc2.getJDBCConfiguration("user1");
- assertNull(user1JDBC2Conf.getPropertyMap("default").get("user"));
- assertNull(user1JDBC2Conf.getPropertyMap("default").get("password"));
- jdbc2.close();
- // user2 runs jdbc1
- jdbc1.open();
- InterpreterContext ctx3 = InterpreterContext.builder()
+ // user2 run default
+ context = InterpreterContext.builder()
.setAuthenticationInfo(user2Credential)
.setInterpreterOut(new InterpreterOutput(null))
- .setReplName("jdbc1")
+ .setReplName("jdbc")
.build();
- jdbc1.interpret("", ctx3);
+ jdbc.interpret("", context);
- JDBCUserConfigurations user2JDBC1Conf = jdbc1.getJDBCConfiguration("user2");
+ JDBCUserConfigurations user2JDBC1Conf = jdbc.getJDBCConfiguration("user2");
assertEquals("dbuser", user2JDBC1Conf.getPropertyMap("default").get("user"));
assertEquals("dbpassword", user2JDBC1Conf.getPropertyMap("default").get("password"));
- jdbc1.close();
- // user2 runs jdbc2
- jdbc2.open();
- InterpreterContext ctx4 = InterpreterContext.builder()
- .setAuthenticationInfo(user2Credential)
- .setInterpreterOut(new InterpreterOutput(null))
- .setReplName("jdbc2")
- .build();
- jdbc2.interpret("", ctx4);
+ // user2 run hive
+ Map<String, String> localProperties = new HashMap<>();
+ localProperties.put("db", "hive");
+ context = InterpreterContext.builder()
+ .setAuthenticationInfo(user2Credential)
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setLocalProperties(localProperties)
+ .setReplName("jdbc")
+ .build();
+ jdbc.interpret("", context);
+
+ user2JDBC1Conf = jdbc.getJDBCConfiguration("user2");
+ assertEquals("user2Id", user2JDBC1Conf.getPropertyMap("hive").get("user"));
+ assertEquals("user2Pw", user2JDBC1Conf.getPropertyMap("hive").get("password"));
+
+ jdbc.close();
+ }
+
+ @Test
+ public void testMultiTenant_2() throws IOException, InterpreterException {
+ // user1 %hive select from default db
+ // user2 %hive select from default db
+ Properties properties = getDBProperty("default", "", "");
+ JDBCInterpreter jdbc = new JDBCInterpreter(properties);
+ AuthenticationInfo user1Credential = getUserAuth("user1", "hive", "user1Id", "user1Pw");
+ AuthenticationInfo user2Credential = getUserAuth("user2", "hive", "user2Id", "user2Pw");
+ jdbc.open();
+
+ // user1 runs default
+ InterpreterContext context = InterpreterContext.builder()
+ .setAuthenticationInfo(user1Credential)
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setReplName("hive")
+ .build();
+ jdbc.interpret("", context);
+
+ JDBCUserConfigurations user1JDBC1Conf = jdbc.getJDBCConfiguration("user1");
+ assertEquals("user1Id", user1JDBC1Conf.getPropertyMap("default").get("user"));
+ assertEquals("user1Pw", user1JDBC1Conf.getPropertyMap("default").get("password"));
+
+ // user2 run default
+ context = InterpreterContext.builder()
+ .setAuthenticationInfo(user2Credential)
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setReplName("hive")
+ .build();
+ jdbc.interpret("", context);
+
+ JDBCUserConfigurations user2JDBC1Conf = jdbc.getJDBCConfiguration("user2");
+ assertEquals("user2Id", user2JDBC1Conf.getPropertyMap("default").get("user"));
+ assertEquals("user2Pw", user2JDBC1Conf.getPropertyMap("default").get("password"));
- JDBCUserConfigurations user2JDBC2Conf = jdbc2.getJDBCConfiguration("user2");
- assertEquals("user2Id", user2JDBC2Conf.getPropertyMap("default").get("user"));
- assertEquals("user2Pw", user2JDBC2Conf.getPropertyMap("default").get("password"));
- jdbc2.close();
+ jdbc.close();
}
@Test
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 30d3330..e8c02ac 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -501,10 +501,13 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
private InterpreterContext getInterpreterContext() {
AngularObjectRegistry registry = null;
ResourcePool resourcePool = null;
-
+ String replName = null;
if (this.interpreter != null) {
registry = this.interpreter.getInterpreterGroup().getAngularObjectRegistry();
resourcePool = this.interpreter.getInterpreterGroup().getResourcePool();
+ InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup)
+ interpreter.getInterpreterGroup()).getInterpreterSetting();
+ replName = interpreterSetting.getName();
}
Credentials credentials = note.getCredentials();
@@ -523,7 +526,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
.setNoteId(note.getId())
.setNoteName(note.getName())
.setParagraphId(getId())
- .setReplName(intpText)
+ .setReplName(replName)
.setParagraphTitle(title)
.setParagraphText(text)
.setAuthenticationInfo(subject)