You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2021/05/21 12:13:08 UTC

[nifi] branch main updated: NIFI-8373 Add Kerberos support to Accumulo processors

This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b6f8441  NIFI-8373 Add Kerberos support to Accumulo processors
b6f8441 is described below

commit b6f84413c4c1b740a7e4c5d5a17930bdbf7bdf18
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Tue Apr 6 07:14:12 2021 +0200

    NIFI-8373 Add Kerberos support to Accumulo processors
    
    This closes #4973.
    
    Signed-off-by: Tamas Palfy <ta...@gmail.com>
---
 .../nifi-accumulo-processors/pom.xml               |   6 +
 .../accumulo/processors/BaseAccumuloProcessor.java |  11 +-
 .../accumulo/processors/PutAccumuloRecord.java     |   3 +
 .../nifi/accumulo/processors/ScanAccumulo.java     |   5 +
 .../controllerservices/BaseAccumuloService.java    |   1 +
 .../nifi-accumulo-services/pom.xml                 |  21 +++
 .../controllerservices/AccumuloService.java        | 185 ++++++++++++++-----
 .../controllerservices/TestAccumuloService.java    | 195 +++++++++++++++++++++
 8 files changed, 381 insertions(+), 46 deletions(-)

diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
index 77d938d..19db6c3 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
@@ -99,6 +99,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <version>1.14.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-accumulo-services</artifactId>
             <version>1.14.0-SNAPSHOT</version>
             <scope>test</scope>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
index d0888ac..a594d5e 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
@@ -65,12 +65,21 @@ public abstract class BaseAccumuloProcessor extends AbstractProcessor {
             .defaultValue("10")
             .build();
 
+    protected static final PropertyDescriptor ACCUMULO_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("accumulo-timeout")
+            .displayName("Accumulo Timeout")
+            .description("Max amount of time to wait for an unresponsive server. Set to 0 sec for no timeout. Entered value less than 1 second may be converted to 0 sec.")
+            .required(false)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 sec")
+            .build();
+
     /**
      * Implementations can decide to include all base properties or individually include them. List is immutable
      * so that implementations must constructor their own lists knowingly
      */
 
-    protected static final ImmutableList<PropertyDescriptor> baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS);
+    protected static final ImmutableList<PropertyDescriptor> baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS,ACCUMULO_TIMEOUT);
 
 
 }
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
index 6a751d6..7808b82 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
@@ -279,6 +279,7 @@ public class PutAccumuloRecord extends BaseAccumuloProcessor {
         BatchWriterConfig writerConfig = new BatchWriterConfig();
         writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger());
         writerConfig.setMaxMemory(maxBytes.longValue());
+        writerConfig.setTimeout(context.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(), TimeUnit.SECONDS);
         tableWriter = client.createMultiTableBatchWriter(writerConfig);
         flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean();
         if (!flushOnEveryFlow){
@@ -355,6 +356,8 @@ public class PutAccumuloRecord extends BaseAccumuloProcessor {
 
         final String tableName = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
 
+        accumuloConnectorService.renewTgtIfNecessary();
+
         // create the table if EL is present, create table is true and the table does not exist.
         if (processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() && processContext.getProperty(CREATE_TABLE).asBoolean()) {
             final TableOperations tableOps = this.client.tableOperations();
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
index 62d024ac..23aeefd 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
@@ -71,6 +71,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
 @EventDriven
@@ -85,6 +86,7 @@ import java.util.concurrent.atomic.LongAdder;
  *
  */
 public class ScanAccumulo extends BaseAccumuloProcessor {
+
     static final PropertyDescriptor START_KEY = new PropertyDescriptor.Builder()
             .displayName("Start key")
             .name("start-key")
@@ -243,10 +245,13 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
 
         boolean cloneFlowFile = incomingFlowFile.isPresent();
 
+        accumuloConnectorService.renewTgtIfNecessary();
+
         try (BatchScanner scanner = client.createBatchScanner(table,auths,threads)) {
             if (!StringUtils.isBlank(startKeyCf) &&  StringUtils.isBlank(endKeyCf))
                 scanner.fetchColumnFamily(new Text(startKeyCf));
             scanner.setRanges(Collections.singleton(lookupRange));
+            scanner.setTimeout(processContext.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(), TimeUnit.SECONDS);
 
             final Iterator<Map.Entry<Key,Value>> kvIter = scanner.iterator();
             if (!kvIter.hasNext()){
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
index d92b152..3266ad5 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
@@ -28,5 +28,6 @@ public interface BaseAccumuloService extends ControllerService {
 
 
     AccumuloClient getClient();
+    void renewTgtIfNecessary();
 
 }
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
index f466ddd..30841d3 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
@@ -66,5 +66,26 @@
             <artifactId>nifi-accumulo-services-api</artifactId>
             <version>1.14.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <version>1.14.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos</artifactId>
+            <version>1.14.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-utils</artifactId>
+            <version>1.14.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.14.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
index 91da7fe..38eee69 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
@@ -20,7 +20,10 @@ package org.apache.nifi.accumulo.controllerservices;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -33,14 +36,21 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 /**
  * Purpose: Controller service that provides us a configured connector. Note that we don't need to close this
@@ -50,16 +60,18 @@ import java.util.List;
  */
 @RequiresInstanceClassLoading
 @Tags({"accumulo", "client", "service"})
-@CapabilityDescription("A controller service for accessing an HBase client.")
+@CapabilityDescription("A controller service for accessing an Accumulo Client.")
 public class AccumuloService extends AbstractControllerService implements BaseAccumuloService {
 
-    private enum AuthenticationType{
+    private enum AuthenticationType {
         PASSWORD,
+        KERBEROS,
         NONE
     }
 
     protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
             .name("ZooKeeper Quorum")
+            .displayName("ZooKeeper Quorum")
             .description("Comma-separated list of ZooKeeper hosts for Accumulo.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -67,35 +79,76 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
 
     protected static final PropertyDescriptor INSTANCE_NAME = new PropertyDescriptor.Builder()
             .name("Instance Name")
+            .displayName("Instance Name")
             .description("Instance name of the Accumulo cluster")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
+            .name("accumulo-authentication-type")
+            .displayName("Authentication Type")
+            .description("Authentication Type")
+            .allowableValues(AuthenticationType.values())
+            .defaultValue(AuthenticationType.PASSWORD.toString())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
 
     protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder()
             .name("Accumulo User")
+            .displayName("Accumulo User")
             .description("Connecting user for Accumulo")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
             .build();
 
     protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder()
             .name("Accumulo Password")
-            .description("Connecting user's password when using the PASSWORD Authentication type")
+            .displayName("Accumulo Password")
+            .description("Connecting user's password")
             .sensitive(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString())
             .build();
 
-    protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
-            .name("Authentication Type")
-            .description("Authentication Type")
-            .allowableValues(AuthenticationType.values())
-            .defaultValue(AuthenticationType.PASSWORD.toString())
+    protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-credentials-service")
+            .displayName("Kerberos Credentials Service")
+            .description("Specifies the Kerberos Credentials Controller Service that should be used for principal + keytab Kerberos authentication")
+            .identifiesControllerService(KerberosCredentialsService.class)
+            .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
+            .build();
+
+    protected static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
+            .name("kerberos-principal")
+            .displayName("Kerberos Principal")
+            .description("Kerberos Principal")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
+            .build();
+
+    protected static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
+            .name("kerberos-password")
+            .displayName("Kerberos Password")
+            .description("Kerberos Password")
+            .sensitive(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
             .build();
 
+    protected static final PropertyDescriptor ACCUMULO_SASL_QOP = new PropertyDescriptor.Builder()
+            .name("accumulo-sasl-qop")
+            .displayName("Accumulo SASL quality of protection")
+            .description("Accumulo SASL quality of protection for KERBEROS Authentication type")
+            .allowableValues("auth", "auth-int", "auth-conf")
+            .defaultValue("auth-conf")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString())
+            .build();
 
     /**
      * Reference to the accumulo client.
@@ -107,34 +160,27 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
      */
     private List<PropertyDescriptor> properties;
 
+    private KerberosUser kerberosUser;
+
+    private AuthenticationType authType;
+
     @Override
-    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
+    protected void init(ControllerServiceInitializationContext config) {
         List<PropertyDescriptor> props = new ArrayList<>();
         props.add(ZOOKEEPER_QUORUM);
         props.add(INSTANCE_NAME);
-        props.add(ACCUMULO_USER);
         props.add(AUTHENTICATION_TYPE);
+        props.add(ACCUMULO_USER);
         props.add(ACCUMULO_PASSWORD);
+        props.add(KERBEROS_CREDENTIALS_SERVICE);
+        props.add(KERBEROS_PRINCIPAL);
+        props.add(KERBEROS_PASSWORD);
+        props.add(ACCUMULO_SASL_QOP);
         properties = Collections.unmodifiableList(props);
     }
 
-    private AuthenticationToken getToken(final AuthenticationType type, final ConfigurationContext context){
-        switch(type){
-            case PASSWORD:
-                return new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
-            default:
-                return null;
-        }
-    }
-
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(INSTANCE_NAME);
-        properties.add(ZOOKEEPER_QUORUM);
-        properties.add(ACCUMULO_USER);
-        properties.add(AUTHENTICATION_TYPE);
-        properties.add(ACCUMULO_PASSWORD);
         return properties;
     }
 
@@ -150,22 +196,38 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
             problems.add(new ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers must be supplied").build());
         }
 
-        if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
-            problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied").build());
-        }
-
         final AuthenticationType type = validationContext.getProperty(
-                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.PASSWORD;
+                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.NONE;
 
         switch(type){
             case PASSWORD:
+                if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+                    problems.add(
+                            new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied for the Password Authentication type").build());
+                }
                 if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
                     problems.add(
-                            new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password must be supplied for the Password Authentication type").build());
+                            new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
+                                    .explanation("Password must be supplied for the Password Authentication type").build());
+                }
+                break;
+            case KERBEROS:
+                if (!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && !validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+                    problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+                            .explanation("Either Kerberos Password or Kerberos Credential Service must be set").build());
+                } else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+                    problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+                            .explanation("Kerberos Password and Kerberos Credential Service should not be filled out at the same time").build());
+                } else if (validationContext.getProperty(KERBEROS_PASSWORD).isSet() && !validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()) {
+                    problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+                            .explanation("Kerberos Principal must be supplied when principal + password Kerberos authentication is used").build());
+                } else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
+                    problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+                            .explanation("Kerberos Principal (for password) should not be filled out when principal + keytab Kerberos authentication is used").build());
                 }
                 break;
             default:
-                problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non supported Authentication type").build());
+                problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non supported Authentication type").build());
         }
 
         return problems;
@@ -173,38 +235,71 @@ public class AccumuloService extends AbstractControllerService implements BaseAc
 
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
-        if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet() || !context.getProperty(ACCUMULO_USER).isSet()){
+        if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
             throw new InitializationException("Instance name and Zookeeper Quorum must be specified");
         }
 
-
-
+        final KerberosCredentialsService kerberosService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
         final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
-        final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+        this.authType = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue());
 
-        final AuthenticationType type = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue() );
+        final Properties clientConf = new Properties();
+        clientConf.setProperty("instance.zookeepers", zookeepers);
+        clientConf.setProperty("instance.name", instanceName);
 
+        switch(authType){
+            case PASSWORD:
+                final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
 
+                final AuthenticationToken token = new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
 
-        final AuthenticationToken token = getToken(type,context);
+                this.client = Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
+                break;
+            case KERBEROS:
+                final String principal;
+
+                if (kerberosService == null) {
+                    principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
+                    this.kerberosUser = new KerberosPasswordUser(principal, context.getProperty(KERBEROS_PASSWORD).getValue());
+                } else {
+                    principal = kerberosService.getPrincipal();
+                    this.kerberosUser = new KerberosKeytabUser(principal, kerberosService.getKeytab());
+                }
 
-        this.client = Accumulo.newClient().to(instanceName,zookeepers).as(accumuloUser,token).build();
+                clientConf.setProperty("sasl.enabled", "true");
+                clientConf.setProperty("sasl.qop", context.getProperty(ACCUMULO_SASL_QOP).getValue());
 
-        if (null == token){
-            throw new InitializationException("Feature not implemented");
-        }
+                //Client uses the currently logged in user's security context, so need to login first.
+                Configuration conf = new Configuration();
+                conf.set("hadoop.security.authentication", "kerberos");
+                UserGroupInformation.setConfiguration(conf);
+                final UserGroupInformation clientUgi = SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
 
+                this.client = clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () ->
+                        Accumulo.newClient().from(clientConf).as(principal, new KerberosToken()).build());
+                break;
+            default:
+                throw new InitializationException("Not supported authentication type.");
+        }
     }
 
     @Override
-    public AccumuloClient getClient(){
+    public AccumuloClient getClient() {
         return client;
     }
 
+    @Override
+    public void renewTgtIfNecessary() {
+        if (authType.equals(AuthenticationType.KERBEROS)) {
+            SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUser);
+        }
+    }
+
     @OnDisabled
     public void shutdown() {
-        client.close();
+        if (client != null) {
+            client.close();
+        }
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
new file mode 100644
index 0000000..51332bb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.when;
+
+public class TestAccumuloService {
+
+    private static final String INSTANCE = "instance";
+    private static final String ZOOKEEPER = "zookeeper";
+    private static final String PASSWORD = "PASSWORD";
+    private static final String USER = "USER";
+    private static final String KERBEROS = "KERBEROS";
+    private static final String PRINCIPAL = "principal";
+    private static final String KERBEROS_PASSWORD = "kerberos_password";
+    private static final String NONE = "NONE";
+
+    private TestRunner runner;
+    private AccumuloService accumuloService;
+
+    @Mock
+    private KerberosCredentialsService credentialService;
+    @Mock
+    private Processor dummyProcessor;
+
+    @Before
+    public void init() {
+        MockitoAnnotations.initMocks(this);
+
+        runner = TestRunners.newTestRunner(dummyProcessor);
+        accumuloService = new AccumuloService();
+
+        when(credentialService.getIdentifier()).thenReturn("1");
+    }
+
+    @Test
+    public void testServiceValidWithAuthTypePasswordAndInstanceZookeeperUserPasswordAreSet() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+        runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
+        runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER, USER);
+        runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD, PASSWORD);
+        //when
+        //then
+        runner.assertValid(accumuloService);
+    }
+
+    @Test
+    public void testServiceNotValidWithInstanceMissing() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("Instance name must be supplied");
+    }
+
+    @Test
+    public void testServiceNotValidWithZookeeperMissing() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("Zookeepers must be supplied");
+    }
+
+    @Test
+    public void testServiceNotValidWithAuthTypeNone() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+        runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, NONE);
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("Non supported Authentication type");
+    }
+
+    @Test
+    public void testServiceNotValidWithAuthTypePasswordAndUserMissing() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+        runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
+        runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD, PASSWORD);
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("Accumulo user must be supplied");
+    }
+
+    @Test
+    public void testServiceNotValidWithAuthTypePasswordAndPasswordMissing() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+        runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
+        runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER, USER);
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("Password must be supplied");
+    }
+
+    @Test
+    public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceMissing() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+        runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("Either Kerberos Password or Kerberos Credential Service must be set");
+    }
+
+    @Test
+    public void testServiceNotValidWithAuthTypeKerberosAndKerberosPrincipalMissing() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+        runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+        runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD);
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("Kerberos Principal must be supplied");
+    }
+
+    @Test
+    public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceSet() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+        runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+        runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD);
+        runner.addControllerService("kerberos-credentials-service", credentialService);
+        runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier());
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("should not be filled out at the same time");
+    }
+
+    @Test
+    public void testServiceNotValidWithAuthTypeKerberosAndPrincipalAndCredentialServiceSet() throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER);
+        runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+        runner.setProperty(accumuloService, AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL);
+        runner.addControllerService("kerberos-credentials-service", credentialService);
+        runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier());
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("Kerberos Principal (for password) should not be filled out");
+    }
+
+    private void assertServiceIsInvalidWithErrorMessage(String errorMessage) {
+        Exception exception = assertThrows(IllegalStateException.class, () -> runner.enableControllerService(accumuloService));
+        assertThat(exception.getMessage(), containsString(errorMessage));
+    }
+}
\ No newline at end of file