You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pr...@apache.org on 2017/07/07 06:19:50 UTC
zeppelin git commit: [ZEPPELIN-2698] Apply KerberosInterpreter to
JDBCInterpreter
Repository: zeppelin
Updated Branches:
refs/heads/master c3e908e61 -> e1f0a3205
[ZEPPELIN-2698] Apply KerberosInterpreter to JDBCInterpreter
### What is this PR for?
This is to apply new KerberosInterpreter mechanism to JDBCInterpreter for relogin from keytab, instead of on fail relogin.
### What type of PR is it?
[Refactoring]
### What is the Jira issue?
* [ZEPPELIN-2698](https://issues.apache.org/jira/browse/ZEPPELIN-2698)
### How should this be tested?
In JDBC interpreter setting add following properties
- zeppelin.jdbc.auth.type = KERBEROS
- zeppelin.jdbc.principal = principal value
- zeppelin.jdbc.keytab.location = keytab location
Now try and run any of hive's query (say `show tables`) it should return with valid results.
Again, wait for this kerberos ticket to expire (usually its 24hrs) then try the above again, and it should work.
### Questions:
* Does the licenses files need update? N/A
* Is there breaking changes for older versions? N/A
* Does this needs documentation? N/A
Author: Prabhjyot Singh <pr...@gmail.com>
Author: prabhjyotsingh <pr...@gmail.com>
Closes #2443 from prabhjyotsingh/ZEPPELIN-2698 and squashes the following commits:
835b4bd03 [Prabhjyot Singh] check for invalid user input; in case of error fall back to default values
a5a54d466 [Prabhjyot Singh] runKerberosLogin block should return false
582372744 [Prabhjyot Singh] change schedule to submit so it runs without wait for the first time. LAUNCH_KERBEROS_REFRESH_INTERVAL to KERBEROS_REFRESH_INTERVAL
7fe883c3e [Prabhjyot Singh] @zjffdu review comments
7f8b8672b [prabhjyotsingh] call `startKerberosLoginThread` and `shutdownExecutorService` in parent class
57ea80c0c [Prabhjyot Singh] apply KerberosInterpreter to JDBCInterpreter
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e1f0a320
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e1f0a320
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e1f0a320
Branch: refs/heads/master
Commit: e1f0a3205eb5aede0b2d80c9d3de59b3f47b699c
Parents: c3e908e
Author: Prabhjyot Singh <pr...@gmail.com>
Authored: Wed Jul 5 21:14:53 2017 +0530
Committer: Prabhjyot Singh <pr...@gmail.com>
Committed: Fri Jul 7 11:49:43 2017 +0530
----------------------------------------------------------------------
conf/zeppelin-env.sh.template | 2 +-
.../apache/zeppelin/jdbc/JDBCInterpreter.java | 79 ++++++++++----------
.../apache/zeppelin/shell/ShellInterpreter.java | 41 +++++++---
.../shell/security/ShellSecurityImpl.java | 59 ---------------
.../interpreter/KerberosInterpreter.java | 78 ++++++++++++++-----
5 files changed, 130 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/conf/zeppelin-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template
index a9eccf6..7bc38d6 100644
--- a/conf/zeppelin-env.sh.template
+++ b/conf/zeppelin-env.sh.template
@@ -56,7 +56,7 @@
## Kerberos ticket refresh setting
##
#export KINIT_FAIL_THRESHOLD # (optional) How many times should kinit retry. The default value is 5.
-#export LAUNCH_KERBEROS_REFRESH_INTERVAL # (optional) The refresh interval for Kerberos ticket. The default value is 1d.
+#export KERBEROS_REFRESH_INTERVAL # (optional) The refresh interval for Kerberos ticket. The default value is 1d.
## Use provided spark installation ##
## defining SPARK_HOME makes Zeppelin run spark interpreter process using spark-submit
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 72d7981..948914f 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -50,6 +50,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.KerberosInterpreter;
import org.apache.zeppelin.interpreter.ResultMessages;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
@@ -89,7 +90,7 @@ import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMeth
* }
* </p>
*/
-public class JDBCInterpreter extends Interpreter {
+public class JDBCInterpreter extends KerberosInterpreter {
private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class);
@@ -147,12 +148,29 @@ public class JDBCInterpreter extends Interpreter {
maxLineResults = MAX_LINE_DEFAULT;
}
+ @Override
+ protected boolean runKerberosLogin() {
+ try {
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
+ return true;
+ } else if (UserGroupInformation.isLoginTicketBased()) {
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ return true;
+ }
+ } catch (Exception e) {
+ logger.error("Unable to run kinit for zeppelin", e);
+ }
+ return false;
+ }
+
public HashMap<String, Properties> getPropertiesMap() {
return basePropretiesMap;
}
@Override
public void open() {
+ super.open();
for (String propertyKey : property.stringPropertyNames()) {
logger.debug("propertyKey: {}", propertyKey);
String[] keyValue = propertyKey.split("\\.", 2);
@@ -190,6 +208,16 @@ public class JDBCInterpreter extends Interpreter {
setMaxLineResults();
}
+
+ protected boolean isKerboseEnabled() {
+ UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
+ if (authType.equals(KERBEROS)) {
+ return true;
+ }
+ return false;
+ }
+
+
private void setMaxLineResults() {
if (basePropretiesMap.containsKey(COMMON_KEY) &&
basePropretiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) {
@@ -259,6 +287,7 @@ public class JDBCInterpreter extends Interpreter {
@Override
public void close() {
+ super.close();
try {
initStatementMap();
initConnectionPoolMap();
@@ -709,49 +738,17 @@ public class JDBCInterpreter extends Interpreter {
}
getJDBCConfiguration(user).removeStatement(paragraphId);
} catch (Throwable e) {
- if (e.getCause() instanceof TTransportException &&
- Throwables.getStackTraceAsString(e).contains("GSS") &&
- getJDBCConfiguration(user).isConnectionInDBDriverPoolSuccessful(propertyKey)) {
- return reLoginFromKeytab(propertyKey, sql, interpreterContext, interpreterResult);
- } else {
- logger.error("Cannot run " + sql, e);
- String errorMsg = Throwables.getStackTraceAsString(e);
- try {
- closeDBPool(user, propertyKey);
- } catch (SQLException e1) {
- logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1);
- }
- interpreterResult.add(errorMsg);
- return new InterpreterResult(Code.ERROR, interpreterResult.message());
- }
- }
- return interpreterResult;
- }
-
- private InterpreterResult reLoginFromKeytab(String propertyKey, String sql,
- InterpreterContext interpreterContext, InterpreterResult interpreterResult) {
- String user = interpreterContext.getAuthenticationInfo().getUser();
- try {
- closeDBPool(user, propertyKey);
- } catch (SQLException e) {
- logger.error("Error, could not close DB pool in reLoginFromKeytab ", e);
- }
- UserGroupInformation.AuthenticationMethod authType =
- JDBCSecurityImpl.getAuthtype(property);
- if (authType.equals(KERBEROS)) {
+ logger.error("Cannot run " + sql, e);
+ String errorMsg = Throwables.getStackTraceAsString(e);
try {
- if (UserGroupInformation.isLoginKeytabBased()) {
- UserGroupInformation.getLoginUser().reloginFromKeytab();
- } else if (UserGroupInformation.isLoginTicketBased()) {
- UserGroupInformation.getLoginUser().reloginFromTicketCache();
- }
- } catch (IOException e) {
- logger.error("Cannot reloginFromKeytab " + sql, e);
- interpreterResult.add(e.getMessage());
- return new InterpreterResult(Code.ERROR, interpreterResult.message());
+ closeDBPool(user, propertyKey);
+ } catch (SQLException e1) {
+ logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1);
}
+ interpreterResult.add(errorMsg);
+ return new InterpreterResult(Code.ERROR, interpreterResult.message());
}
- return executeSql(propertyKey, sql, interpreterContext);
+ return interpreterResult;
}
/**
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
index 79fc3a3..07eed5f 100644
--- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
+++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
@@ -31,13 +31,13 @@ import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.KerberosInterpreter;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.apache.zeppelin.shell.security.ShellSecurityImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,17 +57,14 @@ public class ShellInterpreter extends KerberosInterpreter {
@Override
public void open() {
+ super.open();
LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
executors = new ConcurrentHashMap<>();
- if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
- startKerberosLoginThread();
- }
}
@Override
public void close() {
- shutdownExecutorService();
-
+ super.close();
for (String executorKey : executors.keySet()) {
DefaultExecutor executor = executors.remove(executorKey);
if (executor != null) {
@@ -163,12 +160,38 @@ public class ShellInterpreter extends KerberosInterpreter {
@Override
protected boolean runKerberosLogin() {
try {
- ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
+ createSecureConfiguration();
+ return true;
} catch (Exception e) {
LOGGER.error("Unable to run kinit for zeppelin", e);
- return false;
}
- return true;
+ return false;
+ }
+
+ public void createSecureConfiguration() {
+ Properties properties = getProperty();
+ CommandLine cmdLine = CommandLine.parse(shell);
+ cmdLine.addArgument("-c", false);
+ String kinitCommand = String.format("kinit -k -t %s %s",
+ properties.getProperty("zeppelin.shell.keytab.location"),
+ properties.getProperty("zeppelin.shell.principal"));
+ cmdLine.addArgument(kinitCommand, false);
+ DefaultExecutor executor = new DefaultExecutor();
+ try {
+ executor.execute(cmdLine);
+ } catch (Exception e) {
+ LOGGER.error("Unable to run kinit for zeppelin user " + kinitCommand, e);
+ throw new InterpreterException(e);
+ }
+ }
+
+ @Override
+ protected boolean isKerboseEnabled() {
+ if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type")) && getProperty(
+ "zeppelin.shell.auth.type").equalsIgnoreCase("kerberos")) {
+ return true;
+ }
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java b/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java
deleted file mode 100644
index ecfdb0c..0000000
--- a/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.shell.security;
-
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.DefaultExecutor;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-
-/***
- * Shell security helper
- */
-public class ShellSecurityImpl {
-
- private static Logger LOGGER = LoggerFactory.getLogger(ShellSecurityImpl.class);
-
- public static void createSecureConfiguration(Properties properties, String shell) {
-
- String authType = properties.getProperty("zeppelin.shell.auth.type")
- .trim().toUpperCase();
-
- switch (authType) {
- case "KERBEROS":
- CommandLine cmdLine = CommandLine.parse(shell);
- cmdLine.addArgument("-c", false);
- String kinitCommand = String.format("kinit -k -t %s %s",
- properties.getProperty("zeppelin.shell.keytab.location"),
- properties.getProperty("zeppelin.shell.principal"));
- cmdLine.addArgument(kinitCommand, false);
- DefaultExecutor executor = new DefaultExecutor();
-
- try {
- int exitVal = executor.execute(cmdLine);
- } catch (Exception e) {
- LOGGER.error("Unable to run kinit for zeppelin user " + kinitCommand, e);
- throw new InterpreterException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
index 4673e48..4da5ef5 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
@@ -31,15 +31,24 @@ import org.slf4j.LoggerFactory;
/**
* Interpreter wrapper for Kerberos initialization
*
- * runKerberosLogin() method you need to implement that determine Zeppelin's behavior.
+ * runKerberosLogin() method you need to implement that determine how should this interpeter do a
+ * kinit for this interpreter.
+ * isKerboseEnabled() method needs to implement which determines if the kerberos is enabled for that
+ * interpreter.
* startKerberosLoginThread() needs to be called inside the open() and
* shutdownExecutorService() inside close().
+ *
+ *
+ * Environment variables defined in zeppelin-env.sh
+ * KERBEROS_REFRESH_INTERVAL controls the refresh interval for Kerberos ticket. The default value
+ * is 1d.
+ * KINIT_FAIL_THRESHOLD controls how many times should kinit retry. The default value is 5.
*/
public abstract class KerberosInterpreter extends Interpreter {
private Integer kinitFailCount = 0;
- protected ScheduledExecutorService scheduledExecutorService;
- public static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class);
+ private ScheduledExecutorService scheduledExecutorService;
+ private static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class);
public KerberosInterpreter(Properties property) {
super(property);
@@ -48,23 +57,54 @@ public abstract class KerberosInterpreter extends Interpreter {
@ZeppelinApi
protected abstract boolean runKerberosLogin();
- public String getKerberosRefreshInterval() {
- if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) {
- return "1d";
- } else {
- return System.getenv("KERBEROS_REFRESH_INTERVAL");
+ @ZeppelinApi
+ protected abstract boolean isKerboseEnabled();
+
+ public void open() {
+ if (isKerboseEnabled()) {
+ startKerberosLoginThread();
+ }
+ }
+
+ public void close() {
+ if (isKerboseEnabled()) {
+ shutdownExecutorService();
}
}
- public Integer kinitFailThreshold() {
- if (System.getenv("KINIT_FAIL_THRESHOLD") == null) {
- return 5;
- } else {
- return new Integer(System.getenv("KINIT_FAIL_THRESHOLD"));
+ private Long getKerberosRefreshInterval() {
+ Long refreshInterval;
+ String refreshIntervalString = "1d";
+ //defined in zeppelin-env.sh, if not initialized then the default value is one day.
+ if (System.getenv("KERBEROS_REFRESH_INTERVAL") != null) {
+ refreshIntervalString = System.getenv("KERBEROS_REFRESH_INTERVAL");
+ }
+ try {
+ refreshInterval = getTimeAsMs(refreshIntervalString);
+ } catch (IllegalArgumentException e) {
+ logger.error("Cannot get time in MS for the given string, " + refreshIntervalString
+ + " defaulting to 1d ", e);
+ refreshInterval = getTimeAsMs("1d");
+ }
+
+ return refreshInterval;
+ }
+
+ private Integer kinitFailThreshold() {
+ Integer kinitFailThreshold = 5;
+ //defined in zeppelin-env.sh, if not initialized then the default value is 5.
+ if (System.getenv("KINIT_FAIL_THRESHOLD") != null) {
+ try {
+ kinitFailThreshold = new Integer(System.getenv("KINIT_FAIL_THRESHOLD"));
+ } catch (Exception e) {
+ logger.error("Cannot get integer value from the given string, " + System
+ .getenv("KINIT_FAIL_THRESHOLD") + " defaulting to " + kinitFailThreshold, e);
+ }
}
+ return kinitFailThreshold;
}
- public Long getTimeAsMs(String time) {
+ private Long getTimeAsMs(String time) {
if (time == null) {
logger.error("Cannot convert to time value.", time);
time = "1d";
@@ -86,10 +126,10 @@ public abstract class KerberosInterpreter extends Interpreter {
suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
}
- protected ScheduledExecutorService startKerberosLoginThread() {
+ private ScheduledExecutorService startKerberosLoginThread() {
scheduledExecutorService = Executors.newScheduledThreadPool(1);
- scheduledExecutorService.schedule(new Callable() {
+ scheduledExecutorService.submit(new Callable() {
public Object call() throws Exception {
if (runKerberosLogin()) {
@@ -97,7 +137,7 @@ public abstract class KerberosInterpreter extends Interpreter {
kinitFailCount = 0;
// schedule another kinit run with a fixed delay.
scheduledExecutorService
- .schedule(this, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
+ .schedule(this, getKerberosRefreshInterval(), TimeUnit.MILLISECONDS);
} else {
kinitFailCount++;
logger.info("runKerberosLogin failed for " + kinitFailCount + " time(s).");
@@ -111,12 +151,12 @@ public abstract class KerberosInterpreter extends Interpreter {
}
return null;
}
- }, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
+ });
return scheduledExecutorService;
}
- protected void shutdownExecutorService() {
+ private void shutdownExecutorService() {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
}