You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ve...@apache.org on 2018/12/29 00:53:51 UTC
[ranger] branch master updated: RANGER-2313: tagsync fails to
authenticate with ranger in kerberized cluster when using
ranger-tagsync-update.sh script
This is an automated email from the ASF dual-hosted git repository.
vel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push:
new a97f094 RANGER-2313: tagsync fails to authenticate with ranger in kerberized cluster when using ranger-tagsync-update.sh script
a97f094 is described below
commit a97f0947e192ea67cba64a9ae4be18f6375b2dc3
Author: Abhay Kulkarni <>
AuthorDate: Thu Dec 27 10:18:22 2018 -0800
RANGER-2313: tagsync fails to authenticate with ranger in kerberized cluster when using ranger-tagsync-update.sh script
Signed-off-by: Velmurugan Periasamy <ve...@apache.org>
---
.../ranger/tagsync/process/TagSynchronizer.java | 2 +-
.../source/atlas/EntityNotificationWrapper.java | 4 +-
.../source/atlasrest/AtlasRESTTagSource.java | 177 +++++++++++++--------
.../ranger/tagsync/source/file/FileTagSource.java | 31 ++--
4 files changed, 129 insertions(+), 85 deletions(-)
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 49ff76f..8806c74 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -353,7 +353,7 @@ public class TagSynchronizer {
return tagSource;
}
- private static boolean initializeKerberosIdentity(Properties props) {
+ public static boolean initializeKerberosIdentity(Properties props) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagSynchronizer.initializeKerberosIdentity()");
}
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
index adaa2f9..9781aa6 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
@@ -93,7 +93,7 @@ public class EntityNotificationWrapper {
isEntityTypeHandled = isEntityActive && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
isEntityDeleteOp = EntityNotificationV2.OperationType.ENTITY_DELETE == v2Notification.getOperationType();
isEntityCreateOp = EntityNotificationV2.OperationType.ENTITY_CREATE == v2Notification.getOperationType();
- isEmptyClassifications = CollectionUtils.isNotEmpty(atlasEntity.getClassifications());
+ isEmptyClassifications = CollectionUtils.isEmpty(atlasEntity.getClassifications());
List<AtlasClassification> allClassifications = atlasEntity.getClassifications();
@@ -166,7 +166,7 @@ public class EntityNotificationWrapper {
isEntityTypeHandled = isEntityActive && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
isEntityDeleteOp = EntityNotificationV1.OperationType.ENTITY_DELETE == v1Notification.getOperationType();
isEntityCreateOp = EntityNotificationV1.OperationType.ENTITY_CREATE == v1Notification.getOperationType();
- isEmptyClassifications = CollectionUtils.isNotEmpty(v1Notification.getAllTraits());
+ isEmptyClassifications = CollectionUtils.isEmpty(v1Notification.getAllTraits());
List<Struct> allTraits = ((EntityNotificationV1) notification).getAllTraits();
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 2b4a668..8b12aff 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -69,7 +69,8 @@ import java.util.TimeZone;
public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
private static final Log LOG = LogFactory.getLog(AtlasRESTTagSource.class);
- private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() {
+ private static final int REQUESTED_ENTITIES_LIMIT_MAX = 10000;
+ private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() {
@Override
protected DateFormat initialValue() {
SimpleDateFormat dateFormat = new SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR);
@@ -97,26 +98,34 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
TagSynchronizer.printConfigurationProperties(props);
- TagSink tagSink = TagSynchronizer.initializeTagSink(props);
+ boolean ret = TagSynchronizer.initializeKerberosIdentity(props);
- if (tagSink != null) {
+ if (ret) {
- if (atlasRESTTagSource.initialize(props)) {
- try {
- tagSink.start();
- atlasRESTTagSource.setTagSink(tagSink);
- atlasRESTTagSource.synchUp();
- } catch (Exception exception) {
- LOG.error("ServiceTags upload failed : ", exception);
+ TagSink tagSink = TagSynchronizer.initializeTagSink(props);
+
+ if (tagSink != null) {
+
+ if (atlasRESTTagSource.initialize(props)) {
+ try {
+ tagSink.start();
+ atlasRESTTagSource.setTagSink(tagSink);
+ atlasRESTTagSource.synchUp();
+ } catch (Exception exception) {
+ LOG.error("ServiceTags upload failed : ", exception);
+ System.exit(1);
+ }
+ } else {
+ LOG.error("AtlasRESTTagSource initialization failed, exiting.");
System.exit(1);
}
+
} else {
- LOG.error("AtlasRESTTagSource initialized failed, exiting.");
+ LOG.error("TagSink initialization failed, exiting.");
System.exit(1);
}
-
} else {
- LOG.error("TagSink initialialization failed, exiting.");
+ LOG.error("Error initializing kerberos identity");
System.exit(1);
}
@@ -236,76 +245,104 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getAtlasActiveEntities()");
}
- List<RangerAtlasEntityWithTags> ret = null;
-
- SearchParameters searchParams = new SearchParameters();
- AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- AtlasTypeRegistry.AtlasTransientTypeRegistry tty = null;
- AtlasSearchResult searchResult = null;
+ List<RangerAtlasEntityWithTags> ret = new ArrayList<>();
- searchParams.setClassification("*");
- searchParams.setIncludeClassificationAttributes(true);
- searchParams.setOffset(0);
- searchParams.setLimit(Integer.MAX_VALUE);
-
- boolean commitUpdates = false;
+ AtlasClientV2 atlasClient = null;
try {
- AtlasClientV2 atlasClient = getAtlasClient();
- searchResult = atlasClient.facetedSearch(searchParams);
- AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter());
- tty = typeRegistry.lockTypeRegistryForUpdate();
- tty.addTypes(typesDef);
- commitUpdates = true;
- } catch (AtlasServiceException | AtlasBaseException | IOException excp) {
- LOG.error("failed to download tags from Atlas", excp);
- } catch (Exception unexpectedException) {
- LOG.error("Failed to download tags from Atlas due to unexpected exception", unexpectedException);
- } finally {
- if (tty != null) {
- typeRegistry.releaseTypeRegistryForUpdate(tty, commitUpdates);
- }
+ atlasClient = getAtlasClient();
+ } catch (IOException exception) {
+ LOG.error("Failed to get Atlas client.", exception);
}
- if (commitUpdates && searchResult != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(AtlasType.toJson(searchResult));
- }
- ret = new ArrayList<>();
- List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
- if (CollectionUtils.isNotEmpty(entityHeaders)) {
- for (AtlasEntityHeader header : entityHeaders) {
- if (!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping entity because it is not ACTIVE, header:[" + header + "]");
- }
- continue;
+ if (atlasClient != null) {
+
+ SearchParameters searchParams = new SearchParameters();
+
+ searchParams.setExcludeDeletedEntities(true);
+ searchParams.setClassification("*");
+ //searchParams.setIncludeSubClassifications(true);
+ //searchParams.setIncludeSubTypes(true);
+ searchParams.setIncludeClassificationAttributes(true);
+ searchParams.setLimit(REQUESTED_ENTITIES_LIMIT_MAX);
+
+ boolean isMoreData;
+ int nextStartIndex = 0;
+
+ do {
+ AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
+ AtlasTypeRegistry.AtlasTransientTypeRegistry tty = null;
+ AtlasSearchResult searchResult = null;
+ boolean commitUpdates = false;
+
+ searchParams.setOffset(nextStartIndex);
+ isMoreData = false;
+
+ try {
+ searchResult = atlasClient.facetedSearch(searchParams);
+ AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter());
+ tty = typeRegistry.lockTypeRegistryForUpdate();
+ tty.addTypes(typesDef);
+ commitUpdates = true;
+ } catch (AtlasServiceException | AtlasBaseException excp) {
+ LOG.error("failed to download tags from Atlas", excp);
+ ret = null;
+ } catch (Exception unexpectedException) {
+ LOG.error("Failed to download tags from Atlas due to unexpected exception", unexpectedException);
+ ret = null;
+ } finally {
+ if (tty != null) {
+ typeRegistry.releaseTypeRegistryForUpdate(tty, commitUpdates);
}
+ }
- String typeName = header.getTypeName();
- if (!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not fetching Atlas entities of type:[" + typeName + "]");
- }
- continue;
+ if (commitUpdates && searchResult != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(AtlasType.toJson(searchResult));
}
- List<EntityNotificationWrapper.RangerAtlasClassification> allTagsForEntity = new ArrayList<>();
+ List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
- for (AtlasClassification classification : header.getClassifications()) {
- List<EntityNotificationWrapper.RangerAtlasClassification> tags = resolveTag(typeRegistry, classification);
- if (tags != null) {
- allTagsForEntity.addAll(tags);
- }
- }
+ if (CollectionUtils.isNotEmpty(entityHeaders)) {
- if (CollectionUtils.isNotEmpty(allTagsForEntity)) {
- RangerAtlasEntity entity = new RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes());
- RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry);
+ nextStartIndex += entityHeaders.size();
+ isMoreData = true;
- ret.add(entityWithTags);
+ for (AtlasEntityHeader header : entityHeaders) {
+ if (!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping entity because it is not ACTIVE, header:[" + header + "]");
+ }
+ continue;
+ }
+
+ String typeName = header.getTypeName();
+ if (!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not fetching Atlas entities of type:[" + typeName + "]");
+ }
+ continue;
+ }
+
+ List<EntityNotificationWrapper.RangerAtlasClassification> allTagsForEntity = new ArrayList<>();
+
+ for (AtlasClassification classification : header.getClassifications()) {
+ List<EntityNotificationWrapper.RangerAtlasClassification> tags = resolveTag(typeRegistry, classification);
+ if (tags != null) {
+ allTagsForEntity.addAll(tags);
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(allTagsForEntity)) {
+ RangerAtlasEntity entity = new RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes());
+ RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry);
+
+ ret.add(entityWithTags);
+ }
+ }
}
}
- }
+ } while (isMoreData);
+
}
if (LOG.isDebugEnabled()) {
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
index f0a3fd0..62a5f73 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -66,26 +66,33 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
TagSynchronizer.printConfigurationProperties(props);
- TagSink tagSink = TagSynchronizer.initializeTagSink(props);
+ boolean ret = TagSynchronizer.initializeKerberosIdentity(props);
- if (tagSink != null) {
+ if (ret) {
+ TagSink tagSink = TagSynchronizer.initializeTagSink(props);
- if (fileTagSource.initialize(props)) {
- try {
- tagSink.start();
- fileTagSource.setTagSink(tagSink);
- fileTagSource.synchUp();
- } catch (Exception exception) {
- LOG.error("ServiceTags upload failed : ", exception);
+ if (tagSink != null) {
+
+ if (fileTagSource.initialize(props)) {
+ try {
+ tagSink.start();
+ fileTagSource.setTagSink(tagSink);
+ fileTagSource.synchUp();
+ } catch (Exception exception) {
+ LOG.error("ServiceTags upload failed : ", exception);
+ System.exit(1);
+ }
+ } else {
+ LOG.error("FileTagSource initialized failed, exiting.");
System.exit(1);
}
+
} else {
- LOG.error("FileTagSource initialized failed, exiting.");
+ LOG.error("TagSink initialialization failed, exiting.");
System.exit(1);
}
-
} else {
- LOG.error("TagSink initialialization failed, exiting.");
+ LOG.error("Error initializing kerberos identity");
System.exit(1);
}