You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pr...@apache.org on 2017/07/07 06:19:50 UTC

zeppelin git commit: [ZEPPELIN-2698] Apply KerberosInterpreter to JDBCInterpreter

Repository: zeppelin
Updated Branches:
  refs/heads/master c3e908e61 -> e1f0a3205


[ZEPPELIN-2698] Apply KerberosInterpreter to JDBCInterpreter

### What is this PR for?
This is to apply new KerberosInterpreter mechanism to JDBCInterpreter for relogin from keytab, instead of on fail relogin.

### What type of PR is it?
[Refactoring]

### What is the Jira issue?
* [ZEPPELIN-2698](https://issues.apache.org/jira/browse/ZEPPELIN-2698)

### How should this be tested?
In JDBC interpreter setting add following properties
- zeppelin.jdbc.auth.type = KERBEROS
- zeppelin.jdbc.principal = principal value
- zeppelin.jdbc.keytab.location = keytab location

Now try and run any of hive's query (say `show tables`) it should return with valid results.
Again, wait for this kerberos ticket to expire (usually its 24hrs) then try the above again, and it should work.

### Questions:
* Does the licenses files need update? N/A
* Is there breaking changes for older versions? N/A
* Does this needs documentation? N/A

Author: Prabhjyot Singh <pr...@gmail.com>
Author: prabhjyotsingh <pr...@gmail.com>

Closes #2443 from prabhjyotsingh/ZEPPELIN-2698 and squashes the following commits:

835b4bd03 [Prabhjyot Singh] check for invalid user input; in case of error fall back to default values
a5a54d466 [Prabhjyot Singh] runKerberosLogin block should return false
582372744 [Prabhjyot Singh] change schedule to submit so it runs without wait for the first time. LAUNCH_KERBEROS_REFRESH_INTERVAL to KERBEROS_REFRESH_INTERVAL
7fe883c3e [Prabhjyot Singh] @zjffdu review comments
7f8b8672b [prabhjyotsingh] call `startKerberosLoginThread` and `shutdownExecutorService` in parent class
57ea80c0c [Prabhjyot Singh] apply KerberosInterpreter to JDBCInterpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e1f0a320
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e1f0a320
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e1f0a320

Branch: refs/heads/master
Commit: e1f0a3205eb5aede0b2d80c9d3de59b3f47b699c
Parents: c3e908e
Author: Prabhjyot Singh <pr...@gmail.com>
Authored: Wed Jul 5 21:14:53 2017 +0530
Committer: Prabhjyot Singh <pr...@gmail.com>
Committed: Fri Jul 7 11:49:43 2017 +0530

----------------------------------------------------------------------
 conf/zeppelin-env.sh.template                   |  2 +-
 .../apache/zeppelin/jdbc/JDBCInterpreter.java   | 79 ++++++++++----------
 .../apache/zeppelin/shell/ShellInterpreter.java | 41 +++++++---
 .../shell/security/ShellSecurityImpl.java       | 59 ---------------
 .../interpreter/KerberosInterpreter.java        | 78 ++++++++++++++-----
 5 files changed, 130 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/conf/zeppelin-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template
index a9eccf6..7bc38d6 100644
--- a/conf/zeppelin-env.sh.template
+++ b/conf/zeppelin-env.sh.template
@@ -56,7 +56,7 @@
 ## Kerberos ticket refresh setting
 ##
 #export KINIT_FAIL_THRESHOLD                    # (optional) How many times should kinit retry. The default value is 5.
-#export LAUNCH_KERBEROS_REFRESH_INTERVAL        # (optional) The refresh interval for Kerberos ticket. The default value is 1d.
+#export KERBEROS_REFRESH_INTERVAL               # (optional) The refresh interval for Kerberos ticket. The default value is 1d.
 
 ## Use provided spark installation ##
 ## defining SPARK_HOME makes Zeppelin run spark interpreter process using spark-submit

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 72d7981..948914f 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -50,6 +50,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.KerberosInterpreter;
 import org.apache.zeppelin.interpreter.ResultMessages;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
@@ -89,7 +90,7 @@ import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMeth
  * }
  * </p>
  */
-public class JDBCInterpreter extends Interpreter {
+public class JDBCInterpreter extends KerberosInterpreter {
 
   private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class);
 
@@ -147,12 +148,29 @@ public class JDBCInterpreter extends Interpreter {
     maxLineResults = MAX_LINE_DEFAULT;
   }
 
+  @Override
+  protected boolean runKerberosLogin() {
+    try {
+      if (UserGroupInformation.isLoginKeytabBased()) {
+        UserGroupInformation.getLoginUser().reloginFromKeytab();
+        return true;
+      } else if (UserGroupInformation.isLoginTicketBased()) {
+        UserGroupInformation.getLoginUser().reloginFromTicketCache();
+        return true;
+      }
+    } catch (Exception e) {
+      logger.error("Unable to run kinit for zeppelin", e);
+    }
+    return false;
+  }
+
   public HashMap<String, Properties> getPropertiesMap() {
     return basePropretiesMap;
   }
 
   @Override
   public void open() {
+    super.open();
     for (String propertyKey : property.stringPropertyNames()) {
       logger.debug("propertyKey: {}", propertyKey);
       String[] keyValue = propertyKey.split("\\.", 2);
@@ -190,6 +208,16 @@ public class JDBCInterpreter extends Interpreter {
     setMaxLineResults();
   }
 
+
+  protected boolean isKerboseEnabled() {
+    UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
+    if (authType.equals(KERBEROS)) {
+      return true;
+    }
+    return false;
+  }
+
+
   private void setMaxLineResults() {
     if (basePropretiesMap.containsKey(COMMON_KEY) &&
         basePropretiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) {
@@ -259,6 +287,7 @@ public class JDBCInterpreter extends Interpreter {
 
   @Override
   public void close() {
+    super.close();
     try {
       initStatementMap();
       initConnectionPoolMap();
@@ -709,49 +738,17 @@ public class JDBCInterpreter extends Interpreter {
       }
       getJDBCConfiguration(user).removeStatement(paragraphId);
     } catch (Throwable e) {
-      if (e.getCause() instanceof TTransportException &&
-          Throwables.getStackTraceAsString(e).contains("GSS") &&
-          getJDBCConfiguration(user).isConnectionInDBDriverPoolSuccessful(propertyKey)) {
-        return reLoginFromKeytab(propertyKey, sql, interpreterContext, interpreterResult);
-      } else {
-        logger.error("Cannot run " + sql, e);
-        String errorMsg = Throwables.getStackTraceAsString(e);
-        try {
-          closeDBPool(user, propertyKey);
-        } catch (SQLException e1) {
-          logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1);
-        }
-        interpreterResult.add(errorMsg);
-        return new InterpreterResult(Code.ERROR, interpreterResult.message());
-      }
-    }
-    return interpreterResult;
-  }
-
-  private InterpreterResult reLoginFromKeytab(String propertyKey, String sql,
-     InterpreterContext interpreterContext, InterpreterResult interpreterResult) {
-    String user = interpreterContext.getAuthenticationInfo().getUser();
-    try {
-      closeDBPool(user, propertyKey);
-    } catch (SQLException e) {
-      logger.error("Error, could not close DB pool in reLoginFromKeytab ", e);
-    }
-    UserGroupInformation.AuthenticationMethod authType =
-        JDBCSecurityImpl.getAuthtype(property);
-    if (authType.equals(KERBEROS)) {
+      logger.error("Cannot run " + sql, e);
+      String errorMsg = Throwables.getStackTraceAsString(e);
       try {
-        if (UserGroupInformation.isLoginKeytabBased()) {
-          UserGroupInformation.getLoginUser().reloginFromKeytab();
-        } else if (UserGroupInformation.isLoginTicketBased()) {
-          UserGroupInformation.getLoginUser().reloginFromTicketCache();
-        }
-      } catch (IOException e) {
-        logger.error("Cannot reloginFromKeytab " + sql, e);
-        interpreterResult.add(e.getMessage());
-        return new InterpreterResult(Code.ERROR, interpreterResult.message());
+        closeDBPool(user, propertyKey);
+      } catch (SQLException e1) {
+        logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1);
       }
+      interpreterResult.add(errorMsg);
+      return new InterpreterResult(Code.ERROR, interpreterResult.message());
     }
-    return executeSql(propertyKey, sql, interpreterContext);
+    return interpreterResult;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
index 79fc3a3..07eed5f 100644
--- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
+++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
@@ -31,13 +31,13 @@ import org.apache.commons.exec.ExecuteWatchdog;
 import org.apache.commons.exec.PumpStreamHandler;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.KerberosInterpreter;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.apache.zeppelin.shell.security.ShellSecurityImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,17 +57,14 @@ public class ShellInterpreter extends KerberosInterpreter {
 
   @Override
   public void open() {
+    super.open();
     LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
     executors = new ConcurrentHashMap<>();
-    if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
-      startKerberosLoginThread();
-    }
   }
 
   @Override
   public void close() {
-    shutdownExecutorService();
-
+    super.close();
     for (String executorKey : executors.keySet()) {
       DefaultExecutor executor = executors.remove(executorKey);
       if (executor != null) {
@@ -163,12 +160,38 @@ public class ShellInterpreter extends KerberosInterpreter {
   @Override
   protected boolean runKerberosLogin() {
     try {
-      ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
+      createSecureConfiguration();
+      return true;
     } catch (Exception e) {
       LOGGER.error("Unable to run kinit for zeppelin", e);
-      return false;
     }
-    return true;
+    return false;
+  }
+
+  public void createSecureConfiguration() {
+    Properties properties = getProperty();
+    CommandLine cmdLine = CommandLine.parse(shell);
+    cmdLine.addArgument("-c", false);
+    String kinitCommand = String.format("kinit -k -t %s %s",
+        properties.getProperty("zeppelin.shell.keytab.location"),
+        properties.getProperty("zeppelin.shell.principal"));
+    cmdLine.addArgument(kinitCommand, false);
+    DefaultExecutor executor = new DefaultExecutor();
+    try {
+      executor.execute(cmdLine);
+    } catch (Exception e) {
+      LOGGER.error("Unable to run kinit for zeppelin user " + kinitCommand, e);
+      throw new InterpreterException(e);
+    }
+  }
+
+  @Override
+  protected boolean isKerboseEnabled() {
+    if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type")) && getProperty(
+        "zeppelin.shell.auth.type").equalsIgnoreCase("kerberos")) {
+      return true;
+    }
+    return false;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java b/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java
deleted file mode 100644
index ecfdb0c..0000000
--- a/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.shell.security;
-
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.DefaultExecutor;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-
-/***
- * Shell security helper
- */
-public class ShellSecurityImpl {
-
-  private static Logger LOGGER = LoggerFactory.getLogger(ShellSecurityImpl.class);
-
-  public static void createSecureConfiguration(Properties properties, String shell) {
-
-    String authType = properties.getProperty("zeppelin.shell.auth.type")
-      .trim().toUpperCase();
-
-    switch (authType) {
-      case "KERBEROS":
-        CommandLine cmdLine = CommandLine.parse(shell);
-        cmdLine.addArgument("-c", false);
-        String kinitCommand = String.format("kinit -k -t %s %s",
-            properties.getProperty("zeppelin.shell.keytab.location"),
-            properties.getProperty("zeppelin.shell.principal"));
-        cmdLine.addArgument(kinitCommand, false);
-        DefaultExecutor executor = new DefaultExecutor();
-
-        try {
-          int exitVal = executor.execute(cmdLine);
-        } catch (Exception e) {
-          LOGGER.error("Unable to run kinit for zeppelin user " + kinitCommand, e);
-          throw new InterpreterException(e);
-        }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e1f0a320/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
index 4673e48..4da5ef5 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
@@ -31,15 +31,24 @@ import org.slf4j.LoggerFactory;
 /**
  * Interpreter wrapper for Kerberos initialization
  *
- * runKerberosLogin() method you need to implement that determine Zeppelin's behavior.
+ * runKerberosLogin() method you need to implement that determine how should this interpeter do a
+ * kinit for this interpreter.
+ * isKerboseEnabled() method needs to implement which determines if the kerberos is enabled for that
+ * interpreter.
  * startKerberosLoginThread() needs to be called inside the open() and
  * shutdownExecutorService() inside close().
+ *
+ * 
+ * Environment variables defined in zeppelin-env.sh
+ * KERBEROS_REFRESH_INTERVAL controls the refresh interval for Kerberos ticket. The default value
+ * is 1d.
+ * KINIT_FAIL_THRESHOLD controls how many times should kinit retry. The default value is 5.
  */
 public abstract class KerberosInterpreter extends Interpreter {
 
   private Integer kinitFailCount = 0;
-  protected ScheduledExecutorService scheduledExecutorService;
-  public static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class);
+  private ScheduledExecutorService scheduledExecutorService;
+  private static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class);
 
   public KerberosInterpreter(Properties property) {
     super(property);
@@ -48,23 +57,54 @@ public abstract class KerberosInterpreter extends Interpreter {
   @ZeppelinApi
   protected abstract boolean runKerberosLogin();
 
-  public String getKerberosRefreshInterval() {
-    if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) {
-      return "1d";
-    } else {
-      return System.getenv("KERBEROS_REFRESH_INTERVAL");
+  @ZeppelinApi
+  protected abstract boolean isKerboseEnabled();
+
+  public void open() {
+    if (isKerboseEnabled()) {
+      startKerberosLoginThread();
+    }
+  }
+
+  public void close() {
+    if (isKerboseEnabled()) {
+      shutdownExecutorService();
     }
   }
 
-  public Integer kinitFailThreshold() {
-    if (System.getenv("KINIT_FAIL_THRESHOLD") == null) {
-      return 5;
-    } else {
-      return new Integer(System.getenv("KINIT_FAIL_THRESHOLD"));
+  private Long getKerberosRefreshInterval() {
+    Long refreshInterval;
+    String refreshIntervalString = "1d";
+    //defined in zeppelin-env.sh, if not initialized then the default value is one day.
+    if (System.getenv("KERBEROS_REFRESH_INTERVAL") != null) {
+      refreshIntervalString = System.getenv("KERBEROS_REFRESH_INTERVAL");
+    }
+    try {
+      refreshInterval = getTimeAsMs(refreshIntervalString);
+    } catch (IllegalArgumentException e) {
+      logger.error("Cannot get time in MS for the given string, " + refreshIntervalString
+          + " defaulting to 1d ", e);
+      refreshInterval = getTimeAsMs("1d");
+    }
+
+    return refreshInterval;
+  }
+
+  private Integer kinitFailThreshold() {
+    Integer kinitFailThreshold = 5;
+    //defined in zeppelin-env.sh, if not initialized then the default value is 5.
+    if (System.getenv("KINIT_FAIL_THRESHOLD") != null) {
+      try {
+        kinitFailThreshold = new Integer(System.getenv("KINIT_FAIL_THRESHOLD"));
+      } catch (Exception e) {
+        logger.error("Cannot get integer value from the given string, " + System
+            .getenv("KINIT_FAIL_THRESHOLD") + " defaulting to " + kinitFailThreshold, e);
+      }
     }
+    return kinitFailThreshold;
   }
 
-  public Long getTimeAsMs(String time) {
+  private Long getTimeAsMs(String time) {
     if (time == null) {
       logger.error("Cannot convert to time value.", time);
       time = "1d";
@@ -86,10 +126,10 @@ public abstract class KerberosInterpreter extends Interpreter {
         suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
   }
 
-  protected ScheduledExecutorService startKerberosLoginThread() {
+  private ScheduledExecutorService startKerberosLoginThread() {
     scheduledExecutorService = Executors.newScheduledThreadPool(1);
 
-    scheduledExecutorService.schedule(new Callable() {
+    scheduledExecutorService.submit(new Callable() {
       public Object call() throws Exception {
 
         if (runKerberosLogin()) {
@@ -97,7 +137,7 @@ public abstract class KerberosInterpreter extends Interpreter {
           kinitFailCount = 0;
           // schedule another kinit run with a fixed delay.
           scheduledExecutorService
-              .schedule(this, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
+              .schedule(this, getKerberosRefreshInterval(), TimeUnit.MILLISECONDS);
         } else {
           kinitFailCount++;
           logger.info("runKerberosLogin failed for " + kinitFailCount + " time(s).");
@@ -111,12 +151,12 @@ public abstract class KerberosInterpreter extends Interpreter {
         }
         return null;
       }
-    }, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
+    });
 
     return scheduledExecutorService;
   }
 
-  protected void shutdownExecutorService() {
+  private void shutdownExecutorService() {
     if (scheduledExecutorService != null) {
       scheduledExecutorService.shutdown();
     }