You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2021/04/29 16:59:26 UTC
[ranger] branch ranger-2.2 updated: RANGER-3253: Make incremental
policy change computation more resilient
This is an automated email from the ASF dual-hosted git repository.
abhay pushed a commit to branch ranger-2.2
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/ranger-2.2 by this push:
new 1eb702a RANGER-3253: Make incremental policy change computation more resilient
1eb702a is described below
commit 1eb702ab45f43636a45961135286214b41bbcd17
Author: Abhay Kulkarni <ab...@apache.org>
AuthorDate: Thu Apr 29 09:04:17 2021 -0700
RANGER-3253: Make incremental policy change computation more resilient
---
.../ranger/plugin/model/RangerPolicyDelta.java | 13 ++--
.../policyengine/RangerPolicyRepository.java | 25 ++++--
.../ranger/plugin/util/RangerPolicyDeltaUtil.java | 55 ++++++++++----
.../java/org/apache/ranger/biz/ServiceDBStore.java | 16 +++-
.../ranger/common/RangerServicePoliciesCache.java | 88 ++++++----------------
.../org/apache/ranger/db/XXPolicyChangeLogDao.java | 32 ++++----
6 files changed, 116 insertions(+), 113 deletions(-)
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java b/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java
index 1d2b143..5292a98 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java
@@ -50,23 +50,24 @@ public class RangerPolicyDelta implements java.io.Serializable {
private Long id;
private Integer changeType;
+ private Long policiesVersion;
private RangerPolicy policy;
public RangerPolicyDelta() {
- this(null, null, null);
+ this(null, null, null, null);
}
- public RangerPolicyDelta(final Long id, final Integer changeType, final RangerPolicy policy) {
+ public RangerPolicyDelta(final Long id, final Integer changeType, final Long policiesVersion, final RangerPolicy policy) {
setId(id);
setChangeType(changeType);
+ setPoliciesVersion(policiesVersion);
setPolicy(policy);
}
public Long getId() { return id; }
public Integer getChangeType() { return changeType; }
- @JsonIgnore
- public Long getPolicyVersion() { return policy != null ? policy.getVersion() : null; }
+ public Long getPoliciesVersion() { return policiesVersion; }
@JsonIgnore
public String getServiceType() { return policy != null ? policy.getServiceType() : null; }
@@ -86,13 +87,15 @@ public class RangerPolicyDelta implements java.io.Serializable {
private void setChangeType(Integer changeType) { this.changeType = changeType; }
+ private void setPoliciesVersion(Long policiesVersion) { this.policiesVersion = policiesVersion; }
+
public void setPolicy(RangerPolicy policy) { this.policy = policy; }
@Override
public String toString() {
return "id:" + id
+ ", changeType:" + changeTypeNames[changeType]
- + ", policyVersion:" + getPolicyVersion()
+ + ", policiesVersion:" + getPoliciesVersion()
+ ", serviceType:" + getServiceType()
+ ", policyType:" + getPolicyType()
+ ", policyId:[" + getPolicyId() + "]"
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
index 20c1a36..caff33a 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
@@ -1208,6 +1208,7 @@ public class RangerPolicyRepository {
}
if (policyDeltaType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE) {
+ removeEvaluatorFromTrie(oldEvaluator, trie, resourceDefName);
addEvaluatorToTrie(newEvaluator, trie, resourceDefName);
} else if (policyDeltaType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
removeEvaluatorFromTrie(oldEvaluator, trie, resourceDefName);
@@ -1227,6 +1228,8 @@ public class RangerPolicyRepository {
if (newEvaluator != null) {
RangerPolicy.RangerPolicyResource resource = newEvaluator.getPolicyResource().get(resourceDefName);
trie.add(resource, newEvaluator);
+ } else {
+ LOG.warn("Unexpected: newPolicyEvaluator is null for resource:[" + resourceDefName + "]");
}
}
@@ -1298,7 +1301,7 @@ public class RangerPolicyRepository {
while (iterator.hasNext()) {
if (id.equals(iterator.next().getId())) {
iterator.remove();
- break;
+ //break;
}
}
@@ -1353,12 +1356,17 @@ public class RangerPolicyRepository {
switch (changeType) {
case RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE:
+ if (currentEvaluator != null) {
+ removePolicy(policyId);
+ }
if (policy != null) {
newEvaluator = addPolicy(policy);
}
break;
case RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE: {
- removePolicy(policyId);
+ if (currentEvaluator != null) {
+ removePolicy(policyId);
+ }
if (policy != null) {
newEvaluator = addPolicy(policy);
}
@@ -1469,24 +1477,25 @@ public class RangerPolicyRepository {
continue;
}
+ evaluator = getPolicyEvaluator(policyId);
+ if (evaluator != null) {
+ LOG.warn("Unexpected: Found evaluator for policy-id:[" + policyId + "], changeType=CHANGE_TYPE_POLICY_CREATE");
+ }
+
break;
case RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE:
evaluator = getPolicyEvaluator(policyId);
if (evaluator == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Could not find evaluator for policy-id:[" + policyId + "]");
- }
+ LOG.warn("Unexpected: Did not find evaluator for policy-id:[" + policyId + "], changeType=CHANGE_TYPE_POLICY_UPDATE");
}
break;
case RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE:
evaluator = getPolicyEvaluator(policyId);
if (evaluator == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Could not find evaluator for policy-id:[" + policyId + "]");
- }
+ LOG.warn("Unexpected: Did not find evaluator for policy-id:[" + policyId + "], changeType=CHANGE_TYPE_POLICY_DELETE");
}
break;
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPolicyDeltaUtil.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPolicyDeltaUtil.java
index 4661f79..7088e83 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPolicyDeltaUtil.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPolicyDeltaUtil.java
@@ -78,26 +78,49 @@ public class RangerPolicyDeltaUtil {
int changeType = delta.getChangeType();
if (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
- if (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE) {
- if (delta.getPolicy() != null) {
- ret.add(delta.getPolicy());
+ Long policyId = delta.getPolicyId();
+
+ if (policyId == null) {
+ continue;
+ }
+
+ List<RangerPolicy> deletedPolicies = new ArrayList<>();
+
+ Iterator<RangerPolicy> iter = ret.iterator();
+
+ while (iter.hasNext()) {
+ RangerPolicy policy = iter.next();
+ if (policyId.equals(policy.getId())) {
+ deletedPolicies.add(policy);
+ iter.remove();
}
- } else {
- // Either UPDATE or DELETE
- Long policyId = delta.getPolicyId();
-
- Iterator<RangerPolicy> iter = ret.iterator();
- while (iter.hasNext()) {
- if (policyId.equals(iter.next().getId())) {
- iter.remove();
- break;
+ }
+
+ switch(changeType) {
+ case RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE: {
+ if (CollectionUtils.isNotEmpty(deletedPolicies)) {
+ LOG.warn("Unexpected: found existing policy for CHANGE_TYPE_POLICY_CREATE: " + Arrays.toString(deletedPolicies.toArray()));
+ }
+ break;
+ }
+ case RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE: {
+ if (CollectionUtils.isEmpty(deletedPolicies) || deletedPolicies.size() > 1) {
+ LOG.warn("Unexpected: found no policy or multiple policies for CHANGE_TYPE_POLICY_UPDATE: " + Arrays.toString(deletedPolicies.toArray()));
}
+ break;
}
- if (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE) {
- if (delta.getPolicy() != null) {
- ret.add(delta.getPolicy());
+ case RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE: {
+ if (CollectionUtils.isEmpty(deletedPolicies) || deletedPolicies.size() > 1) {
+ LOG.warn("Unexpected: found no policy or multiple policies for CHANGE_TYPE_POLICY_DELETE: " + Arrays.toString(deletedPolicies.toArray()));
}
+ break;
}
+ default:
+ break;
+ }
+
+ if (changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
+ ret.add(delta.getPolicy());
}
} else {
LOG.warn("Found unexpected changeType in policyDelta:[" + delta + "]. Ignoring delta");
@@ -111,7 +134,7 @@ public class RangerPolicyDeltaUtil {
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("applyDeltas(deltas=null, serviceType=" + serviceType + ")");
+ LOG.warn("Unexpected : applyDeltas called with deltas=null");
}
ret = policies;
}
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
index 4fb71f0..52e0c6f 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
@@ -2931,11 +2931,9 @@ public class ServiceDBStore extends AbstractServiceStore {
}
if (ret != null) {
- ret.setPolicyVersion(serviceVersionInfoDbObj == null ? null : serviceVersionInfoDbObj.getPolicyVersion());
ret.setPolicyUpdateTime(serviceVersionInfoDbObj == null ? null : serviceVersionInfoDbObj.getPolicyUpdateTime());
ret.setAuditMode(auditMode);
if (ret.getTagPolicies() != null) {
- ret.getTagPolicies().setPolicyVersion(tagServiceVersionInfoDbObj == null ? null : tagServiceVersionInfoDbObj.getPolicyVersion());
ret.getTagPolicies().setPolicyUpdateTime(tagServiceVersionInfoDbObj == null ? null : tagServiceVersionInfoDbObj.getPolicyUpdateTime());
ret.getTagPolicies().setAuditMode(auditMode);
}
@@ -3131,6 +3129,8 @@ public class ServiceDBStore extends AbstractServiceStore {
List<RangerPolicyDelta> resourcePolicyDeltas;
List<RangerPolicyDelta> tagPolicyDeltas = null;
+ Long retrievedPolicyVersion = null;
+ Long retrievedTagPolicyVersion = null;
String componentServiceType = serviceDef.getName();
@@ -3140,7 +3140,9 @@ public class ServiceDBStore extends AbstractServiceStore {
if (CollectionUtils.isNotEmpty(resourcePolicyDeltas)) {
isValid = RangerPolicyDeltaUtil.isValidDeltas(resourcePolicyDeltas, componentServiceType);
- if (!isValid) {
+ if (isValid) {
+ retrievedPolicyVersion = resourcePolicyDeltas.get(resourcePolicyDeltas.size() - 1).getPoliciesVersion();
+ } else {
LOG.warn("Resource policy-Deltas :[" + resourcePolicyDeltas + "] from version :[" + lastKnownVersion + "] are not valid");
}
@@ -3154,7 +3156,9 @@ public class ServiceDBStore extends AbstractServiceStore {
isValid = RangerPolicyDeltaUtil.isValidDeltas(tagPolicyDeltas, tagServiceType);
- if (!isValid) {
+ if (isValid) {
+ retrievedTagPolicyVersion = tagPolicyDeltas.get(tagPolicyDeltas.size() - 1).getPoliciesVersion();
+ } else {
LOG.warn("Tag policy-Deltas :[" + tagPolicyDeltas + "] for service-version :[" + lastKnownVersion + "] and delta-id :[" + id + "] are not valid");
}
}
@@ -3167,7 +3171,9 @@ public class ServiceDBStore extends AbstractServiceStore {
resourcePolicyDeltas.addAll(tagPolicyDeltas);
}
+
List<RangerPolicyDelta> compressedDeltas = compressDeltas(resourcePolicyDeltas);
+
if (compressedDeltas != null) {
ret = new ServicePolicies();
ret.setServiceId(service.getId());
@@ -3175,6 +3181,7 @@ public class ServiceDBStore extends AbstractServiceStore {
ret.setServiceDef(serviceDef);
ret.setPolicies(null);
ret.setPolicyDeltas(compressedDeltas);
+ ret.setPolicyVersion(retrievedPolicyVersion);
if (tagServiceDef != null && tagService != null) {
ServicePolicies.TagPolicies tagPolicies = new ServicePolicies.TagPolicies();
@@ -3182,6 +3189,7 @@ public class ServiceDBStore extends AbstractServiceStore {
tagPolicies.setServiceId(tagService.getId());
tagPolicies.setServiceName(tagService.getName());
tagPolicies.setPolicies(null);
+ tagPolicies.setPolicyVersion(retrievedTagPolicyVersion);
ret.setTagPolicies(tagPolicies);
}
} else {
diff --git a/security-admin/src/main/java/org/apache/ranger/common/RangerServicePoliciesCache.java b/security-admin/src/main/java/org/apache/ranger/common/RangerServicePoliciesCache.java
index 1176e0b..04aa472 100644
--- a/security-admin/src/main/java/org/apache/ranger/common/RangerServicePoliciesCache.java
+++ b/security-admin/src/main/java/org/apache/ranger/common/RangerServicePoliciesCache.java
@@ -35,6 +35,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -307,34 +308,28 @@ public class RangerServicePoliciesCache {
final List<RangerPolicy> policies = servicePolicies.getPolicies() == null ? new ArrayList<>() : servicePolicies.getPolicies();
final List<RangerPolicy> newPolicies = RangerPolicyDeltaUtil.applyDeltas(policies, servicePoliciesFromDb.getPolicyDeltas(), servicePolicies.getServiceDef().getName());
- boolean isSanityCheckPassed = checkAndLoadCompleteSetOfPolicies(serviceName, serviceStore, newPolicies);
+ servicePolicies.setPolicies(newPolicies);
- if (!isSanityCheckPassed) {
- isCacheReloadedByDQEvent = true;
- } else {
- servicePolicies.setPolicies(newPolicies);
+ checkCacheSanity(serviceName, serviceStore, false);
- // Rebuild tag-policies from original tag-policies and deltas
- if (servicePoliciesFromDb.getTagPolicies() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("This service has associated tag service. Will compute tagPolicies from corresponding policy-deltas");
- }
+ // Rebuild tag-policies from original tag-policies and deltas
+ if (servicePoliciesFromDb.getTagPolicies() != null) {
+ String tagServiceName = servicePoliciesFromDb.getTagPolicies().getServiceName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("This service has associated tag service:[" + tagServiceName + "]. Will compute tagPolicies from corresponding policy-deltas");
+ }
- final List<RangerPolicy> tagPolicies = (servicePolicies.getTagPolicies() == null || CollectionUtils.isEmpty(servicePolicies.getTagPolicies().getPolicies())) ? new ArrayList<>() : servicePolicies.getTagPolicies().getPolicies();
- final List<RangerPolicy> newTagPolicies = RangerPolicyDeltaUtil.applyDeltas(tagPolicies, servicePoliciesFromDb.getPolicyDeltas(), servicePoliciesFromDb.getTagPolicies().getServiceDef().getName());
+ final List<RangerPolicy> tagPolicies = (servicePolicies.getTagPolicies() == null || CollectionUtils.isEmpty(servicePolicies.getTagPolicies().getPolicies())) ? new ArrayList<>() : servicePolicies.getTagPolicies().getPolicies();
+ final List<RangerPolicy> newTagPolicies = RangerPolicyDeltaUtil.applyDeltas(tagPolicies, servicePoliciesFromDb.getPolicyDeltas(), servicePoliciesFromDb.getTagPolicies().getServiceDef().getName());
- isSanityCheckPassed = checkAndLoadCompleteSetOfPolicies(serviceName, servicePoliciesFromDb.getTagPolicies().getServiceName(), serviceStore, newTagPolicies);
+ servicePolicies.getTagPolicies().setPolicies(newTagPolicies);
+ servicePolicies.getTagPolicies().setPolicyVersion(servicePoliciesFromDb.getTagPolicies().getPolicyVersion());
- if (!isSanityCheckPassed) {
- isCacheReloadedByDQEvent = true;
- } else {
- servicePolicies.getTagPolicies().setPolicies(newTagPolicies);
- }
+ checkCacheSanity(servicePoliciesFromDb.getTagPolicies().getServiceName(), serviceStore, true);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("This service has no associated tag service");
- }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("This service has no associated tag service");
}
}
}
@@ -361,50 +356,15 @@ public class RangerServicePoliciesCache {
return isCacheReloadedByDQEvent;
}
- private boolean checkAndLoadCompleteSetOfPolicies(String serviceName, ServiceStore serviceStore, List<RangerPolicy> policiesFromIncrementalComputation) {
- boolean ret = checkCacheSanity(serviceName, serviceStore, policiesFromIncrementalComputation);
-
- if (!ret) {
- loadCompleteSetOfPolicies(serviceName, serviceStore);
- LOG.warn("Policies loaded from db for service:" + serviceName + "] are [" + servicePolicies + "]");
- LOG.warn("Policies in cache for service:[" + serviceName + "] are [" + policiesFromIncrementalComputation + "]");
-
- }
-
- return ret;
- }
-
- private boolean checkAndLoadCompleteSetOfPolicies(String serviceName, String tagServiceName, ServiceStore serviceStore, List<RangerPolicy> policiesFromIncrementalComputation) {
- boolean ret = checkCacheSanity(tagServiceName, serviceStore, policiesFromIncrementalComputation);
-
- if (!ret) {
- loadCompleteSetOfPolicies(serviceName, serviceStore);
- LOG.warn("Policies loaded from db for service:" + serviceName + "] are [" + servicePolicies + "]");
- LOG.warn("Tag Policies in cache for tag-service:[" + tagServiceName + "] are [" + policiesFromIncrementalComputation + "]");
- }
+ private void checkCacheSanity(String serviceName, ServiceStore serviceStore, boolean isTagService) {
+ final boolean result;
+ Long dbPolicyVersion = serviceStore.getServicePolicyVersion(serviceName);
+ Long cachedPolicyVersion = isTagService ? servicePolicies.getTagPolicies().getPolicyVersion() : servicePolicies.getPolicyVersion();
- return ret;
- }
+ result = Objects.equals(dbPolicyVersion, cachedPolicyVersion);
- private boolean checkCacheSanity(String serviceName, ServiceStore serviceStore, List<RangerPolicy> policiesFromIncrementalComputation) {
- final boolean ret;
- long dbPoliciesCount = serviceStore.getPoliciesCount(serviceName);
- long cachedPoliciesCount = (CollectionUtils.isEmpty(policiesFromIncrementalComputation) ? 0 : policiesFromIncrementalComputation.size());
- ret = dbPoliciesCount == cachedPoliciesCount;
- if (!ret) {
- LOG.warn("For service:[" + serviceName + "]: dbPoliciesCount:[" + dbPoliciesCount + "], cachedPoliciesCount:[" + cachedPoliciesCount + "]");
- }
- return ret;
-
- }
-
- private void loadCompleteSetOfPolicies(String serviceName, ServiceStore serviceStore) {
- LOG.warn("Something went wrong doing incremental policies for service:[" + serviceName + "]!! Loading all policies!!");
- try {
- servicePolicies = serviceStore.getServicePolicies(serviceName, -1L);
- pruneUnusedAttributes();
- } catch (Exception ex) {
- LOG.warn("Could not get policies from database");
+ if (!result) {
+ LOG.info("checkCacheSanity(serviceName=" + serviceName + "): policy cache has a different version than one in the database. However, changes from " + cachedPolicyVersion + " to " + dbPolicyVersion + " will be downloaded in the next download. policyVersionInDB=" + dbPolicyVersion + ", policyVersionInCache=" + cachedPolicyVersion);
}
}
diff --git a/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java b/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java
index 0a1d1c1..477129d 100644
--- a/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java
+++ b/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java
@@ -134,6 +134,7 @@ public class XXPolicyChangeLogDao extends BaseDao<XXPolicyChangeLog> {
Long logRecordId = (Long) log[POLICY_CHANGE_LOG_RECORD_ID_COLUMN_NUMBER];
Integer policyChangeType = (Integer) log[POLICY_CHANGE_LOG_RECORD_CHANGE_TYPE_COLUMN_NUMBER];
+ Long policiesVersion = (Long) log[POLICY_CHANGE_LOG_RECORD_POLICY_VERSION_COLUMN_NUMBER];
String serviceType = (String) log[POLICY_CHANGE_LOG_RECORD_SERVICE_TYPE_COLUMN_NUMBER];
Long policyId = (Long) log[POLICY_CHANGE_LOG_RECORD_POLICY_ID_COLUMN_NUMBER];
@@ -145,30 +146,29 @@ public class XXPolicyChangeLogDao extends BaseDao<XXPolicyChangeLog> {
} catch (Exception e) {
LOG.error("Cannot read policy:[" + policyId + "]. Should not have come here!! Offending log-record-id:[" + logRecordId + "] and returning...", e);
ret.clear();
- ret.add(new RangerPolicyDelta(logRecordId, RangerPolicyDelta.CHANGE_TYPE_LOG_ERROR, null));
+ ret.add(new RangerPolicyDelta(logRecordId, RangerPolicyDelta.CHANGE_TYPE_LOG_ERROR, null, null));
break;
}
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Policy:[" + policyId + "] not found - log-record - id:[" + logRecordId + "], PolicyChangeType:[" + policyChangeType + "]");
+ LOG.warn("Policy:[" + policyId + "] not found - log-record - id:[" + logRecordId + "], PolicyChangeType:[" + policyChangeType + "]");
+ if (policyChangeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE || policyChangeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE) {
+ LOG.warn("Ignoring POLICY_CREATE or POLICY_UPDATE type change for policy-id:[" + policyId + "] as it was not found.. probably already deleted");
+ continue;
+ } else {
+ // policyChangeType is DELETE
+ policy = new RangerPolicy();
+ policy.setId(policyId);
+ policy.setServiceType(serviceType);
+ policy.setPolicyType((Integer) log[POLICY_CHANGE_LOG_RECORD_POLICY_TYPE_COLUMN_NUMBER]);
+ policy.setZoneName((String) log[POLICY_CHANGE_LOG_RECORD_ZONE_NAME_COLUMN_NUMBER]);
}
-
- // Create a dummy policy as the policy cannot be found - probably already deleted
- policy = new RangerPolicy();
- policy.setId(policyId);
- policy.setVersion((Long) log[POLICY_CHANGE_LOG_RECORD_POLICY_VERSION_COLUMN_NUMBER]);
- policy.setPolicyType((Integer) log[POLICY_CHANGE_LOG_RECORD_POLICY_TYPE_COLUMN_NUMBER]);
- policy.setZoneName((String) log[POLICY_CHANGE_LOG_RECORD_ZONE_NAME_COLUMN_NUMBER]);
}
- policy.setServiceType(serviceType);
- ret.add(new RangerPolicyDelta(logRecordId, policyChangeType, policy));
+ ret.add(new RangerPolicyDelta(logRecordId, policyChangeType, policiesVersion, policy));
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("policyId is null! log-record-id:[" + logRecordId + ", service-type:[" + log[POLICY_CHANGE_LOG_RECORD_SERVICE_TYPE_COLUMN_NUMBER] + "], policy-change-type:[" + log[POLICY_CHANGE_LOG_RECORD_CHANGE_TYPE_COLUMN_NUMBER] + "]");
- }
+ LOG.info("delta-reset-event: log-record-id=" + logRecordId + "; service-type=" + serviceType + "; policy-change-type=" + policyChangeType + ". Discarding " + ret.size() + " deltas");
ret.clear();
- ret.add(new RangerPolicyDelta(logRecordId, policyChangeType, null));
+ ret.add(new RangerPolicyDelta(logRecordId, policyChangeType, null, null));
break;
}
}