You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ve...@apache.org on 2018/12/29 00:53:51 UTC

[ranger] branch master updated: RANGER-2313: tagsync fails to authenticate with ranger in kerberized cluster when using ranger-tagsync-update.sh script

This is an automated email from the ASF dual-hosted git repository.

vel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new a97f094  RANGER-2313: tagsync fails to authenticate with ranger in kerberized cluster when using ranger-tagsync-update.sh script
a97f094 is described below

commit a97f0947e192ea67cba64a9ae4be18f6375b2dc3
Author: Abhay Kulkarni <>
AuthorDate: Thu Dec 27 10:18:22 2018 -0800

    RANGER-2313: tagsync fails to authenticate with ranger in kerberized cluster when using ranger-tagsync-update.sh script
    
    Signed-off-by: Velmurugan Periasamy <ve...@apache.org>
---
 .../ranger/tagsync/process/TagSynchronizer.java    |   2 +-
 .../source/atlas/EntityNotificationWrapper.java    |   4 +-
 .../source/atlasrest/AtlasRESTTagSource.java       | 177 +++++++++++++--------
 .../ranger/tagsync/source/file/FileTagSource.java  |  31 ++--
 4 files changed, 129 insertions(+), 85 deletions(-)

diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 49ff76f..8806c74 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -353,7 +353,7 @@ public class TagSynchronizer {
 		return tagSource;
 	}
 
-	private static boolean initializeKerberosIdentity(Properties props) {
+	public static boolean initializeKerberosIdentity(Properties props) {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> TagSynchronizer.initializeKerberosIdentity()");
 		}
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
index adaa2f9..9781aa6 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
@@ -93,7 +93,7 @@ public class EntityNotificationWrapper {
                 isEntityTypeHandled    = isEntityActive && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
                 isEntityDeleteOp       = EntityNotificationV2.OperationType.ENTITY_DELETE == v2Notification.getOperationType();
                 isEntityCreateOp       = EntityNotificationV2.OperationType.ENTITY_CREATE == v2Notification.getOperationType();
-                isEmptyClassifications = CollectionUtils.isNotEmpty(atlasEntity.getClassifications());
+                isEmptyClassifications = CollectionUtils.isEmpty(atlasEntity.getClassifications());
 
                 List<AtlasClassification> allClassifications = atlasEntity.getClassifications();
 
@@ -166,7 +166,7 @@ public class EntityNotificationWrapper {
                 isEntityTypeHandled    = isEntityActive && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
                 isEntityDeleteOp       = EntityNotificationV1.OperationType.ENTITY_DELETE == v1Notification.getOperationType();
                 isEntityCreateOp       = EntityNotificationV1.OperationType.ENTITY_CREATE == v1Notification.getOperationType();
-                isEmptyClassifications = CollectionUtils.isNotEmpty(v1Notification.getAllTraits());
+                isEmptyClassifications = CollectionUtils.isEmpty(v1Notification.getAllTraits());
 
                 List<Struct> allTraits = ((EntityNotificationV1) notification).getAllTraits();
 
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 2b4a668..8b12aff 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -69,7 +69,8 @@ import java.util.TimeZone;
 public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 	private static final Log LOG = LogFactory.getLog(AtlasRESTTagSource.class);
 
-    private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() {
+    	private static final int REQUESTED_ENTITIES_LIMIT_MAX = 10000;
+    	private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() {
 		@Override
 		protected DateFormat initialValue() {
 			SimpleDateFormat dateFormat = new SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR);
@@ -97,26 +98,34 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 
 		TagSynchronizer.printConfigurationProperties(props);
 
-		TagSink tagSink = TagSynchronizer.initializeTagSink(props);
+		boolean ret = TagSynchronizer.initializeKerberosIdentity(props);
 
-		if (tagSink != null) {
+		if (ret) {
 
-			if (atlasRESTTagSource.initialize(props)) {
-				try {
-					tagSink.start();
-					atlasRESTTagSource.setTagSink(tagSink);
-					atlasRESTTagSource.synchUp();
-				} catch (Exception exception) {
-					LOG.error("ServiceTags upload failed : ", exception);
+			TagSink tagSink = TagSynchronizer.initializeTagSink(props);
+
+			if (tagSink != null) {
+
+				if (atlasRESTTagSource.initialize(props)) {
+					try {
+						tagSink.start();
+						atlasRESTTagSource.setTagSink(tagSink);
+						atlasRESTTagSource.synchUp();
+					} catch (Exception exception) {
+						LOG.error("ServiceTags upload failed : ", exception);
+						System.exit(1);
+					}
+				} else {
+					LOG.error("AtlasRESTTagSource initialization failed, exiting.");
 					System.exit(1);
 				}
+
 			} else {
-				LOG.error("AtlasRESTTagSource initialized failed, exiting.");
+				LOG.error("TagSink initialization failed, exiting.");
 				System.exit(1);
 			}
-
 		} else {
-			LOG.error("TagSink initialialization failed, exiting.");
+			LOG.error("Error initializing kerberos identity");
 			System.exit(1);
 		}
 
@@ -236,76 +245,104 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> getAtlasActiveEntities()");
         }
-        List<RangerAtlasEntityWithTags> ret = null;
-
-        SearchParameters searchParams = new SearchParameters();
-        AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
-        AtlasTypeRegistry.AtlasTransientTypeRegistry tty = null;
-        AtlasSearchResult searchResult = null;
+        List<RangerAtlasEntityWithTags> ret         = new ArrayList<>();
 
-        searchParams.setClassification("*");
-        searchParams.setIncludeClassificationAttributes(true);
-        searchParams.setOffset(0);
-        searchParams.setLimit(Integer.MAX_VALUE);
-
-        boolean commitUpdates = false;
+        AtlasClientV2                   atlasClient = null;
         try {
-            AtlasClientV2 atlasClient = getAtlasClient();
-            searchResult = atlasClient.facetedSearch(searchParams);
-            AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter());
-            tty = typeRegistry.lockTypeRegistryForUpdate();
-            tty.addTypes(typesDef);
-            commitUpdates = true;
-        } catch (AtlasServiceException | AtlasBaseException | IOException excp) {
-            LOG.error("failed to download tags from Atlas", excp);
-        } catch (Exception unexpectedException) {
-            LOG.error("Failed to download tags from Atlas due to unexpected exception", unexpectedException);
-        } finally {
-            if (tty != null) {
-                typeRegistry.releaseTypeRegistryForUpdate(tty, commitUpdates);
-            }
+            atlasClient = getAtlasClient();
+        } catch (IOException exception) {
+            LOG.error("Failed to get Atlas client.", exception);
         }
 
-        if (commitUpdates && searchResult != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(AtlasType.toJson(searchResult));
-            }
-            ret = new ArrayList<>();
-            List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
-            if (CollectionUtils.isNotEmpty(entityHeaders)) {
-                for (AtlasEntityHeader header : entityHeaders) {
-                    if (!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Skipping entity because it is not ACTIVE, header:[" + header + "]");
-                        }
-                        continue;
+        if (atlasClient != null) {
+
+            SearchParameters searchParams = new SearchParameters();
+
+            searchParams.setExcludeDeletedEntities(true);
+            searchParams.setClassification("*");
+            //searchParams.setIncludeSubClassifications(true);
+            //searchParams.setIncludeSubTypes(true);
+            searchParams.setIncludeClassificationAttributes(true);
+            searchParams.setLimit(REQUESTED_ENTITIES_LIMIT_MAX);
+
+            boolean isMoreData;
+            int     nextStartIndex = 0;
+
+            do {
+                AtlasTypeRegistry                            typeRegistry  = new AtlasTypeRegistry();
+                AtlasTypeRegistry.AtlasTransientTypeRegistry tty           = null;
+                AtlasSearchResult                            searchResult  = null;
+                boolean                                      commitUpdates = false;
+
+                searchParams.setOffset(nextStartIndex);
+                isMoreData = false;
+
+                try {
+                    searchResult = atlasClient.facetedSearch(searchParams);
+                    AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter());
+                    tty = typeRegistry.lockTypeRegistryForUpdate();
+                    tty.addTypes(typesDef);
+                    commitUpdates = true;
+                } catch (AtlasServiceException | AtlasBaseException excp) {
+                    LOG.error("failed to download tags from Atlas", excp);
+                    ret = null;
+                } catch (Exception unexpectedException) {
+                    LOG.error("Failed to download tags from Atlas due to unexpected exception", unexpectedException);
+                    ret = null;
+                } finally {
+                    if (tty != null) {
+                        typeRegistry.releaseTypeRegistryForUpdate(tty, commitUpdates);
                     }
+                }
 
-                    String typeName = header.getTypeName();
-                    if (!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Not fetching Atlas entities of type:[" + typeName + "]");
-                        }
-                        continue;
+                if (commitUpdates && searchResult != null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(AtlasType.toJson(searchResult));
                     }
 
-                    List<EntityNotificationWrapper.RangerAtlasClassification>          allTagsForEntity       = new ArrayList<>();
+                    List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
 
-                    for (AtlasClassification classification : header.getClassifications()) {
-                        List<EntityNotificationWrapper.RangerAtlasClassification> tags = resolveTag(typeRegistry, classification);
-                        if (tags != null) {
-                            allTagsForEntity.addAll(tags);
-                        }
-                    }
+                    if (CollectionUtils.isNotEmpty(entityHeaders)) {
 
-                    if (CollectionUtils.isNotEmpty(allTagsForEntity)) {
-                        RangerAtlasEntity         entity         = new RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes());
-                        RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry);
+                        nextStartIndex += entityHeaders.size();
+                        isMoreData = true;
 
-                        ret.add(entityWithTags);
+                        for (AtlasEntityHeader header : entityHeaders) {
+                            if (!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Skipping entity because it is not ACTIVE, header:[" + header + "]");
+                                }
+                                continue;
+                            }
+
+                            String typeName = header.getTypeName();
+                            if (!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Not fetching Atlas entities of type:[" + typeName + "]");
+                                }
+                                continue;
+                            }
+
+                            List<EntityNotificationWrapper.RangerAtlasClassification> allTagsForEntity = new ArrayList<>();
+
+                            for (AtlasClassification classification : header.getClassifications()) {
+                                List<EntityNotificationWrapper.RangerAtlasClassification> tags = resolveTag(typeRegistry, classification);
+                                if (tags != null) {
+                                    allTagsForEntity.addAll(tags);
+                                }
+                            }
+
+                            if (CollectionUtils.isNotEmpty(allTagsForEntity)) {
+                                RangerAtlasEntity entity = new RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes());
+                                RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry);
+
+                                ret.add(entityWithTags);
+                            }
+                        }
                     }
                 }
-            }
+            } while (isMoreData);
+
         }
 
         if (LOG.isDebugEnabled()) {
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
index f0a3fd0..62a5f73 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -66,26 +66,33 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
 
 		TagSynchronizer.printConfigurationProperties(props);
 
-		TagSink tagSink = TagSynchronizer.initializeTagSink(props);
+		boolean ret = TagSynchronizer.initializeKerberosIdentity(props);
 
-		if (tagSink != null) {
+		if (ret) {
+			TagSink tagSink = TagSynchronizer.initializeTagSink(props);
 
-			if (fileTagSource.initialize(props)) {
-				try {
-					tagSink.start();
-					fileTagSource.setTagSink(tagSink);
-					fileTagSource.synchUp();
-				} catch (Exception exception) {
-					LOG.error("ServiceTags upload failed : ", exception);
+			if (tagSink != null) {
+
+				if (fileTagSource.initialize(props)) {
+					try {
+						tagSink.start();
+						fileTagSource.setTagSink(tagSink);
+						fileTagSource.synchUp();
+					} catch (Exception exception) {
+						LOG.error("ServiceTags upload failed : ", exception);
+						System.exit(1);
+					}
+				} else {
+					LOG.error("FileTagSource initialized failed, exiting.");
 					System.exit(1);
 				}
+
 			} else {
-				LOG.error("FileTagSource initialized failed, exiting.");
+				LOG.error("TagSink initialialization failed, exiting.");
 				System.exit(1);
 			}
-
 		} else {
-			LOG.error("TagSink initialialization failed, exiting.");
+			LOG.error("Error initializing kerberos identity");
 			System.exit(1);
 		}