You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/01/05 16:17:46 UTC

nifi git commit: NIFI-4741: Avoid DelegationToken expiration at ReportLineageToAtlas. This closes #2377

Repository: nifi
Updated Branches:
  refs/heads/master e5ed62a98 -> 4b8c80ccc


NIFI-4741: Avoid DelegationToken expiration at ReportLineageToAtlas. This closes #2377

The reporting task used to hold a single AtlasClientV2 instance
throughout its runtime starting from being started until being stopped.
If it is configured to use Kerberos authentication for Atlas REST API, after
a published DelegationToken expires (10 hours by default), the reporting
task will not be able to recover from 401 Unauthorized state.

In order to avoid stucking in such situation, this commit changes the
way ReportLineageToAtlas uses AtlasClientV2 instance to create an
instance per onTrigger execution. It also addresses Kerberos ticket
expiration.

This approach incurs some overheads by initiating the client each time,
however, it should be insignificant from an overall processing time
perspective including analyzing NiFi flow and Provenance records.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4b8c80cc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4b8c80cc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4b8c80cc

Branch: refs/heads/master
Commit: 4b8c80cccc3207dc9fe6895266af95cca7c72250
Parents: e5ed62a
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri Jan 5 19:23:28 2018 +0900
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Jan 5 11:17:33 2018 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/atlas/NiFiAtlasClient.java  | 44 ++------------------
 .../org/apache/nifi/atlas/NiFiAtlasHook.java    | 10 +++--
 .../atlas/reporting/ReportLineageToAtlas.java   | 40 +++++++++++++-----
 .../apache/nifi/atlas/security/Kerberos.java    |  3 +-
 .../apache/nifi/atlas/ITNiFiAtlasClient.java    |  4 +-
 .../atlas/reporting/ITReportLineageToAtlas.java |  1 +
 6 files changed, 44 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
index feb2b48..4e95a92 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
@@ -18,7 +18,6 @@ package org.apache.nifi.atlas;
 
 import com.sun.jersey.api.client.UniformInterfaceException;
 import com.sun.jersey.core.util.MultivaluedMapImpl;
-import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasServiceException;
@@ -29,14 +28,12 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.nifi.atlas.security.AtlasAuthN;
 import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.core.MultivaluedMap;
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,7 +41,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
@@ -82,44 +78,10 @@ public class NiFiAtlasClient {
 
     private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class);
 
-    private static NiFiAtlasClient nifiClient;
-    private AtlasClientV2 atlasClient;
+    private final AtlasClientV2 atlasClient;
 
-    private NiFiAtlasClient() {
-        super();
-    }
-
-    public static NiFiAtlasClient getInstance() {
-        if (nifiClient == null) {
-            synchronized (NiFiAtlasClient.class) {
-                if (nifiClient == null) {
-                    nifiClient = new NiFiAtlasClient();
-                }
-            }
-        }
-        return nifiClient;
-    }
-
-    public void initialize(final String[] baseUrls, final AtlasAuthN authN, final File atlasConfDir) {
-
-        synchronized (NiFiAtlasClient.class) {
-
-            if (atlasClient != null) {
-                logger.info("{} had been setup but replacing it with new one.", atlasClient);
-                ApplicationProperties.forceReload();
-            }
-
-            if (atlasConfDir != null) {
-                // If atlasConfDir is not set, atlas-application.properties will be searched under classpath.
-                Properties props = System.getProperties();
-                final String atlasConfProp = "atlas.conf";
-                props.setProperty(atlasConfProp, atlasConfDir.getAbsolutePath());
-                logger.debug("{} has been set to: {}", atlasConfProp, props.getProperty(atlasConfProp));
-            }
-
-            atlasClient = authN.createClient(baseUrls);
-
-        }
+    public NiFiAtlasClient(AtlasClientV2 atlasClient) {
+        this.atlasClient = atlasClient;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
index 43fefff..a15c935 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
@@ -60,7 +60,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
     private static final String CONF_PREFIX = "atlas.hook.nifi.";
     private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
 
-    private final NiFiAtlasClient atlasClient;
+    private NiFiAtlasClient atlasClient;
 
     /**
      * An index to resolve a qualifiedName from a GUID.
@@ -81,9 +81,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
         };
     }
 
-    public NiFiAtlasHook(NiFiAtlasClient atlasClient) {
-        this.atlasClient = atlasClient;
-
+    public NiFiAtlasHook() {
         final int qualifiedNameCacheSize = 10_000;
         this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
 
@@ -91,6 +89,10 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
         this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
     }
 
+    public void setAtlasClient(NiFiAtlasClient atlasClient) {
+        this.atlasClient = atlasClient;
+    }
+
     @Override
     protected String getNumberOfRetriesPropertyKey() {
         return HOOK_NUM_RETRIES;

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 5bb6024..9238f95 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.atlas.reporting;
 
 import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -279,7 +280,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
     private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = ATLAS_KAFKA_PREFIX + "bootstrap.servers";
     private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG;
     private final ServiceLoader<ClusterResolver> clusterResolverLoader = ServiceLoader.load(ClusterResolver.class);
-    private volatile NiFiAtlasClient atlasClient;
+    private volatile AtlasAuthN atlasAuthN;
     private volatile Properties atlasProperties;
     private volatile boolean isTypeDefCreated = false;
     private volatile String defaultClusterName;
@@ -399,13 +400,13 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
     @OnScheduled
     public void setup(ConfigurationContext context) throws IOException {
         // initAtlasClient has to be done first as it loads AtlasProperty.
-        initAtlasClient(context);
+        initAtlasProperties(context);
         initLineageStrategy(context);
         initClusterResolvers(context);
     }
 
     private void initLineageStrategy(ConfigurationContext context) throws IOException {
-        nifiAtlasHook = new NiFiAtlasHook(atlasClient);
+        nifiAtlasHook = new NiFiAtlasHook();
 
         final String strategy = context.getProperty(NIFI_LINEAGE_STRATEGY).getValue();
         if (LINEAGE_STRATEGY_SIMPLE_PATH.equals(strategy)) {
@@ -428,7 +429,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
     }
 
 
-    private void initAtlasClient(ConfigurationContext context) throws IOException {
+    private void initAtlasProperties(ConfigurationContext context) throws IOException {
         List<String> urls = new ArrayList<>();
         parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
         final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
@@ -476,7 +477,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
             throw new ProcessException("Default cluster name is not defined.");
         }
 
-        final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
+        atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
         atlasAuthN.configure(context);
 
         // Create Atlas configuration file if necessary.
@@ -497,16 +498,32 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
             }
         }
 
+        getLogger().debug("Force reloading Atlas application properties.");
+        ApplicationProperties.forceReload();
 
-        atlasClient = NiFiAtlasClient.getInstance();
+        if (confDir != null) {
+            // If atlasConfDir is not set, atlas-application.properties will be searched under classpath.
+            Properties props = System.getProperties();
+            final String atlasConfProp = "atlas.conf";
+            props.setProperty(atlasConfProp, confDir.getAbsolutePath());
+            getLogger().debug("{} has been set to: {}", new Object[]{atlasConfProp, props.getProperty(atlasConfProp)});
+        }
+    }
+
+    /**
+     * In order to avoid authentication expiration issues (i.e. Kerberos ticket and DelegationToken expiration),
+     * create Atlas client instance at every onTrigger execution.
+     */
+    private NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
+        List<String> urls = new ArrayList<>();
+        parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
         try {
-            atlasClient.initialize(urls.toArray(new String[]{}), atlasAuthN, confDir);
+            return new NiFiAtlasClient(atlasAuthN.createClient(urls.toArray(new String[]{})));
         } catch (final NullPointerException e) {
             throw new ProcessException(String.format("Failed to initialize Atlas client due to %s." +
                     " Make sure 'atlas-application.properties' is in the directory specified with %s" +
                     " or under root classpath if not specified.", e, ATLAS_CONF_DIR.getDisplayName()), e);
         }
-
     }
 
     private AtlasAuthN getAtlasAuthN(String atlasAuthNMethod) {
@@ -557,6 +574,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
         // If standalone or being primary node in a NiFi cluster, this node is responsible for doing primary tasks.
         final boolean isResponsibleForPrimaryTasks = !isClustered || getNodeTypeProvider().isPrimary();
 
+        final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context);
+
         // Create Entity defs in Atlas if there's none yet.
         if (!isTypeDefCreated) {
             try {
@@ -578,7 +597,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
 
         // Regardless of whether being a primary task node, each node has to analyse NiFiFlow.
         // Assuming each node has the same flow definition, that is guaranteed by NiFi cluster management mechanism.
-        final NiFiFlow nifiFlow = createNiFiFlow(context);
+        final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient);
 
 
         if (isResponsibleForPrimaryTasks) {
@@ -592,11 +611,12 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
         // NOTE: There is a race condition between the primary node and other nodes.
         // If a node notifies an event related to a NiFi component which is not yet created by NiFi primary node,
         // then the notification message will fail due to having a reference to a non-existing entity.
+        nifiAtlasHook.setAtlasClient(atlasClient);
         consumeNiFiProvenanceEvents(context, nifiFlow);
 
     }
 
-    private NiFiFlow createNiFiFlow(ReportingContext context) {
+    private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient atlasClient) {
         final ProcessGroupStatus rootProcessGroup = context.getEventAccess().getGroupStatus("root");
         final String flowName = rootProcessGroup.getName();
         final String nifiUrl = context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue();

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
index 88feba0..ab55b49 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
@@ -73,7 +73,8 @@ public class Kerberos implements AtlasAuthN {
         UserGroupInformation.setConfiguration(hadoopConf);
         final UserGroupInformation ugi;
         try {
-            ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
+            UserGroupInformation.loginUserFromKeytab(principal, keytab);
+            ugi = UserGroupInformation.getCurrentUser();
         } catch (IOException e) {
             throw new RuntimeException("Failed to login with Kerberos due to: " + e, e);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java
index f1727b0..69a3042 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java
@@ -39,14 +39,14 @@ public class ITNiFiAtlasClient {
 
     @Before
     public void setup() {
-        atlasClient = NiFiAtlasClient.getInstance();
         // Add your atlas server ip address into /etc/hosts as atlas.example.com
         PropertyContext propertyContext = mock(PropertyContext.class);
         when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_USER)).thenReturn(new MockPropertyValue("admin"));
         when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_PASSWORD)).thenReturn(new MockPropertyValue("admin"));
         final AtlasAuthN atlasAuthN = new Basic();
         atlasAuthN.configure(propertyContext);
-        atlasClient.initialize(new String[]{"http://atlas.example.com:21000/"}, atlasAuthN, null);
+
+        atlasClient = new NiFiAtlasClient(atlasAuthN.createClient(new String[]{"http://atlas.example.com:21000/"}));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
index 2fe7d07..e83495a 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
@@ -409,6 +409,7 @@ public class ITReportLineageToAtlas {
         when(eventAccess.getGroupStatus(eq("root"))).thenReturn(tc.rootPgStatus);
 
         final ProvenanceRepository provenanceRepository = mock(ProvenanceRepository.class);
+        when(eventAccess.getControllerStatus()).thenReturn(tc.rootPgStatus);
         when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
         when(eventAccess.getProvenanceEvents(eq(-1L), anyInt())).thenReturn(tc.provenanceRecords);
         when(provenanceRepository.getMaxEventId()).thenReturn((long) tc.provenanceRecords.size() - 1);