You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/07/26 23:53:49 UTC

[griffin] branch master updated: [GRIFFIN-269] Griffin submit job to livy server with Kerberos Authentication

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new c2acabe  [GRIFFIN-269] Griffin submit job to livy server with Kerberos Authentication
c2acabe is described below

commit c2acabe6a085d10416d89589fa67e0c4759b63a6
Author: qwang6 <qw...@gmail.com>
AuthorDate: Sat Jul 27 07:53:32 2019 +0800

    [GRIFFIN-269] Griffin submit job to livy server with Kerberos Authentication
    
    In production environment, submit to Livy Server usually using Kerberos Auth. Add three properties to check whether using Kerberos Auth or not.
    
    livy.need.kerberos=false
    livy.server.auth.kerberos.principal=livy/kerberos.principal
    livy.server.auth.kerberos.keytab=/path/to/livy/keytab/file
    
    Author: qwang6 <qw...@gmail.com>
    
    Closes #514 from qwang6/master.
---
 service/pom.xml                                    |   4 +
 .../apache/griffin/core/job/JobServiceImpl.java    |   8 +-
 .../griffin/core/job/LivyTaskSubmitHelper.java     | 137 ++++++++++++++++++---
 .../apache/griffin/core/job/SparkSubmitJob.java    |  60 +++------
 .../griffin/core/job/StreamingJobOperatorImpl.java |   9 +-
 service/src/main/resources/application.properties  |   3 +
 6 files changed, 153 insertions(+), 68 deletions(-)

diff --git a/service/pom.xml b/service/pom.xml
index b6c4ade..8f51587 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -191,6 +191,10 @@ under the License.
                     <groupId>javax.servlet</groupId>
                     <artifactId>servlet-api</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>servlet-api-2.5</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 302dc43..bd8988c 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -136,10 +136,8 @@ public class JobServiceImpl implements JobService {
     @Autowired
     private LivyTaskSubmitHelper livyTaskSubmitHelper;
 
-    private RestTemplate restTemplate;
-
     public JobServiceImpl() {
-        restTemplate = new RestTemplate();
+
     }
 
     @Override
@@ -538,7 +536,9 @@ public class JobServiceImpl implements JobService {
             new TypeReference<HashMap<String, Object>>() {
             };
         try {
-            String resultStr = restTemplate.getForObject(uri, String.class);
+            String resultStr = livyTaskSubmitHelper.getFromLivy(uri);
+            LOGGER.info(resultStr);
+
             HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr,
                 type);
             setJobInstanceIdAndUri(instance, resultMap);
diff --git a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
index 2dd9cbb..d9ee9e8 100644
--- a/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
+++ b/service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java
@@ -19,32 +19,38 @@ under the License.
 
 package org.apache.griffin.core.job;
 
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
-import static org.apache.griffin.core.util.JsonUtil.toEntity;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.collections.map.HashedMap;
+import org.quartz.JobDetail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.core.env.Environment;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.security.kerberos.client.KerberosRestTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
 
+import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.PostConstruct;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-import org.apache.commons.collections.map.HashedMap;
-import org.quartz.JobDetail;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.core.env.Environment;
-import org.springframework.stereotype.Component;
-import org.springframework.web.client.RestTemplate;
+import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
+import static org.apache.griffin.core.util.JsonUtil.toEntity;
+import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
 
 @Component
 public class LivyTaskSubmitHelper {
@@ -216,8 +222,105 @@ public class LivyTaskSubmitHelper {
     private Map<String, Object> getResultByLivyId(Object livyBatchesId, TypeReference<HashMap<String, Object>> type)
             throws IOException {
         Map<String, Object> resultMap = new HashedMap();
-        String newResult = restTemplate.getForObject(uri + "/" + livyBatchesId, String.class);
-        return newResult == null ? resultMap : toEntity(newResult, type);
+        String livyUri = uri + "/" + livyBatchesId;
+        String result = getFromLivy(livyUri);
+        logger.info(result);
+        return result == null ? resultMap : toEntity(result, type);
+    }
+
+    public String postToLivy(String uri) {
+        logger.info("Post To Livy URI is: " + uri);
+        String needKerberos = env.getProperty("livy.need.kerberos");
+        logger.info("Need Kerberos:" + needKerberos);
+
+        HttpHeaders headers = new HttpHeaders();
+        headers.setContentType(MediaType.APPLICATION_JSON);
+        headers.set(REQUEST_BY_HEADER,"admin");
+
+        if (needKerberos == null || needKerberos.isEmpty()) {
+            logger.error("The property \"livy.need.kerberos\" is empty");
+            return null;
+        }
+
+        if (needKerberos.equalsIgnoreCase("false")) {
+            logger.info("The livy server doesn't need Kerberos Authentication");
+            String result = null;
+            try {
+
+                HttpEntity<String> springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap),headers);
+                result = restTemplate.postForObject(uri,springEntity,String.class);
+
+                logger.info(result);
+            } catch (JsonProcessingException e) {
+                logger.error("Post to livy ERROR. \n {}", e.getMessage());
+            }
+            return result;
+        } else {
+            logger.info("The livy server needs Kerberos Authentication");
+            String userPrincipal = env.getProperty("livy.server.auth.kerberos.principal");
+            String keyTabLocation = env.getProperty("livy.server.auth.kerberos.keytab");
+            logger.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
+
+            KerberosRestTemplate restTemplate = new KerberosRestTemplate(keyTabLocation, userPrincipal);
+            HttpEntity<String> springEntity = null;
+            try {
+                springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap), headers);
+            } catch (JsonProcessingException e) {
+                e.printStackTrace();
+            }
+            String result = restTemplate.postForObject(uri, springEntity, String.class);
+            logger.info(result);
+            return result;
+        }
+    }
+
+    public String getFromLivy(String uri) {
+        logger.info("Get From Livy URI is: " + uri);
+        String needKerberos = env.getProperty("livy.need.kerberos");
+        logger.info("Need Kerberos:" + needKerberos);
+
+        if (needKerberos == null || needKerberos.isEmpty()) {
+            logger.error("The property \"livy.need.kerberos\" is empty");
+            return null;
+        }
+
+        if (needKerberos.equalsIgnoreCase("false")) {
+            logger.info("The livy server doesn't need Kerberos Authentication");
+            return restTemplate.getForObject(uri, String.class);
+        } else {
+            logger.info("The livy server needs Kerberos Authentication");
+            String userPrincipal = env.getProperty("livy.server.auth.kerberos.principal");
+            String keyTabLocation = env.getProperty("livy.server.auth.kerberos.keytab");
+            logger.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
+
+            KerberosRestTemplate restTemplate = new KerberosRestTemplate(keyTabLocation, userPrincipal);
+            String result = restTemplate.getForObject(uri, String.class);
+            logger.info(result);
+            return result;
+        }
     }
 
+    public void deleteByLivy(String uri) {
+        logger.info("Delete by Livy URI is: " + uri);
+        String needKerberos = env.getProperty("livy.need.kerberos");
+        logger.info("Need Kerberos:" + needKerberos);
+
+        if (needKerberos == null || needKerberos.isEmpty()) {
+            logger.error("The property \"livy.need.kerberos\" is empty");
+            return;
+        }
+
+        if (needKerberos.equalsIgnoreCase("false")) {
+            logger.info("The livy server doesn't need Kerberos Authentication");
+            new RestTemplate().delete(uri);
+        } else {
+            logger.info("The livy server needs Kerberos Authentication");
+            String userPrincipal = env.getProperty("livy.server.auth.kerberos.principal");
+            String keyTabLocation = env.getProperty("livy.server.auth.kerberos.keytab");
+            logger.info("principal:{}, lcoation:{}", userPrincipal, keyTabLocation);
+
+            KerberosRestTemplate restTemplate = new KerberosRestTemplate(keyTabLocation, userPrincipal);
+            restTemplate.delete(uri);
+        }
+    }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index f97dc83..01e41ca 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -19,28 +19,7 @@ under the License.
 
 package org.apache.griffin.core.job;
 
-import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
-import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
-import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
-import static org.apache.griffin.core.job.JobInstance.JOB_NAME;
-import static org.apache.griffin.core.job.JobInstance.MEASURE_KEY;
-import static org.apache.griffin.core.job.JobInstance.PREDICATES_KEY;
-import static org.apache.griffin.core.job.JobInstance.PREDICATE_JOB_NAME;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
-import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
-import static org.apache.griffin.core.util.JsonUtil.toEntity;
-import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
-
 import com.fasterxml.jackson.core.type.TypeReference;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.griffin.core.job.entity.JobInstanceBean;
 import org.apache.griffin.core.job.entity.SegmentPredicate;
 import org.apache.griffin.core.job.factory.PredicatorFactory;
@@ -60,15 +39,27 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.core.env.Environment;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
-import org.springframework.web.client.HttpClientErrorException;
 import org.springframework.web.client.RestTemplate;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
+import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
+import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
+import static org.apache.griffin.core.job.JobInstance.*;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
+import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
+import static org.apache.griffin.core.util.JsonUtil.toEntity;
+
 @PersistJobDataAfterExecution
 @DisallowConcurrentExecution
 @Component
@@ -128,24 +119,7 @@ public class SparkSubmitJob implements Job {
     }
 
     private String post2Livy() {
-        String result = null;
-        try {
-            HttpHeaders headers = new HttpHeaders();
-            headers.setContentType(MediaType.APPLICATION_JSON);
-            headers.set(REQUEST_BY_HEADER,"admin");
-
-            HttpEntity<String> springEntity = new HttpEntity<String>(toJsonWithFormat(livyConfMap),headers);
-            result = restTemplate.postForObject(livyUri,springEntity,String.class);
-
-            LOGGER.info(result);
-        } catch (HttpClientErrorException e) {
-            LOGGER.error("Post to livy ERROR. \n {} {}",
-                    e.getMessage(),
-                    e.getResponseBodyAsString());
-        } catch (Exception e) {
-            LOGGER.error("Post to livy ERROR. {}", e.getMessage());
-        }
-        return result;
+        return livyTaskSubmitHelper.postToLivy(livyUri);
     }
 
     private boolean success(List<SegmentPredicate> predicates) {
diff --git a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
index 36b23ce..e7ff758 100644
--- a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
@@ -56,7 +56,6 @@ import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.CollectionUtils;
 import org.springframework.web.client.ResourceAccessException;
 import org.springframework.web.client.RestClientException;
-import org.springframework.web.client.RestTemplate;
 
 @Service
 public class StreamingJobOperatorImpl implements JobOperator {
@@ -72,13 +71,13 @@ public class StreamingJobOperatorImpl implements JobOperator {
     private JobInstanceRepo instanceRepo;
     @Autowired
     private SchedulerFactoryBean factory;
+    @Autowired
+    private LivyTaskSubmitHelper livyTaskSubmitHelper;
 
     private String livyUri;
-    private RestTemplate restTemplate;
 
     @PostConstruct
     public void init() {
-        restTemplate = new RestTemplate();
         livyUri = env.getProperty("livy.uri");
     }
 
@@ -233,7 +232,9 @@ public class StreamingJobOperatorImpl implements JobOperator {
         }
         String url = livyUri + "/" + instance.getSessionId();
         try {
-            restTemplate.delete(url);
+            // Use livy helper to interact with livy
+            livyTaskSubmitHelper.deleteByLivy(url);
+
             LOGGER.info("Job instance({}) has been deleted. {}", instance
                     .getSessionId(), url);
         } catch (ResourceAccessException e) {
diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties
index a6c61bd..c01c270 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -70,6 +70,9 @@ livy.need.queue=false
 livy.task.max.concurrent.count=20
 livy.task.submit.interval.second=3
 livy.task.appId.retry.count=3
+livy.need.kerberos=false
+livy.server.auth.kerberos.principal=livy/kerberos.principal
+livy.server.auth.kerberos.keytab=/path/to/livy/keytab/file
 # yarn url
 yarn.uri=http://localhost:8088
 # griffin event listener