You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2021/04/29 16:59:26 UTC

[ranger] branch ranger-2.2 updated: RANGER-3253: Make incremental policy change computation more resilient

This is an automated email from the ASF dual-hosted git repository.

abhay pushed a commit to branch ranger-2.2
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/ranger-2.2 by this push:
     new 1eb702a  RANGER-3253: Make incremental policy change computation more resilient
1eb702a is described below

commit 1eb702ab45f43636a45961135286214b41bbcd17
Author: Abhay Kulkarni <ab...@apache.org>
AuthorDate: Thu Apr 29 09:04:17 2021 -0700

    RANGER-3253: Make incremental policy change computation more resilient
---
 .../ranger/plugin/model/RangerPolicyDelta.java     | 13 ++--
 .../policyengine/RangerPolicyRepository.java       | 25 ++++--
 .../ranger/plugin/util/RangerPolicyDeltaUtil.java  | 55 ++++++++++----
 .../java/org/apache/ranger/biz/ServiceDBStore.java | 16 +++-
 .../ranger/common/RangerServicePoliciesCache.java  | 88 ++++++----------------
 .../org/apache/ranger/db/XXPolicyChangeLogDao.java | 32 ++++----
 6 files changed, 116 insertions(+), 113 deletions(-)

diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java b/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java
index 1d2b143..5292a98 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java
@@ -50,23 +50,24 @@ public class RangerPolicyDelta implements java.io.Serializable {
 
     private Long                id;
     private Integer             changeType;
+    private Long                policiesVersion;
     private RangerPolicy        policy;
 
     public RangerPolicyDelta() {
-        this(null, null, null);
+        this(null, null, null, null);
     }
 
-    public RangerPolicyDelta(final Long id, final Integer changeType, final RangerPolicy policy) {
+    public RangerPolicyDelta(final Long id, final Integer changeType, final Long policiesVersion, final RangerPolicy policy) {
         setId(id);
         setChangeType(changeType);
+        setPoliciesVersion(policiesVersion);
         setPolicy(policy);
     }
     public Long getId() { return id; }
 
     public Integer getChangeType() { return changeType; }
 
-    @JsonIgnore
-    public Long getPolicyVersion() { return policy != null ? policy.getVersion() : null; }
+    public Long getPoliciesVersion() { return policiesVersion; }
 
     @JsonIgnore
     public String getServiceType() { return policy != null ? policy.getServiceType() : null; }
@@ -86,13 +87,15 @@ public class RangerPolicyDelta implements java.io.Serializable {
 
     private void setChangeType(Integer changeType) { this.changeType = changeType; }
 
+    private void setPoliciesVersion(Long policiesVersion) { this.policiesVersion = policiesVersion; }
+
     public void setPolicy(RangerPolicy policy) { this.policy = policy; }
 
     @Override
     public String toString() {
         return "id:" + id
                 + ", changeType:" + changeTypeNames[changeType]
-                + ", policyVersion:" + getPolicyVersion()
+                + ", policiesVersion:" + getPoliciesVersion()
                 + ", serviceType:" + getServiceType()
                 + ", policyType:" + getPolicyType()
                 + ", policyId:[" + getPolicyId() + "]"
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
index 20c1a36..caff33a 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
@@ -1208,6 +1208,7 @@ public class RangerPolicyRepository {
             }
 
             if (policyDeltaType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE) {
+                removeEvaluatorFromTrie(oldEvaluator, trie, resourceDefName);
                 addEvaluatorToTrie(newEvaluator, trie, resourceDefName);
             } else if (policyDeltaType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
                 removeEvaluatorFromTrie(oldEvaluator, trie, resourceDefName);
@@ -1227,6 +1228,8 @@ public class RangerPolicyRepository {
         if (newEvaluator != null) {
             RangerPolicy.RangerPolicyResource resource = newEvaluator.getPolicyResource().get(resourceDefName);
             trie.add(resource, newEvaluator);
+        } else {
+            LOG.warn("Unexpected: newPolicyEvaluator is null for resource:[" + resourceDefName + "]");
         }
     }
 
@@ -1298,7 +1301,7 @@ public class RangerPolicyRepository {
         while (iterator.hasNext()) {
             if (id.equals(iterator.next().getId())) {
                 iterator.remove();
-                break;
+                //break;
             }
         }
 
@@ -1353,12 +1356,17 @@ public class RangerPolicyRepository {
 
         switch (changeType) {
             case RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE:
+                if (currentEvaluator != null) {
+                    removePolicy(policyId);
+                }
                 if (policy != null) {
                     newEvaluator = addPolicy(policy);
                 }
                 break;
             case RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE: {
-                removePolicy(policyId);
+                if (currentEvaluator != null) {
+                    removePolicy(policyId);
+                }
                 if (policy != null) {
                     newEvaluator = addPolicy(policy);
                 }
@@ -1469,24 +1477,25 @@ public class RangerPolicyRepository {
 
                         continue;
                     }
+                    evaluator = getPolicyEvaluator(policyId);
+                    if (evaluator != null) {
+                        LOG.warn("Unexpected: Found evaluator for policy-id:[" + policyId + "], changeType=CHANGE_TYPE_POLICY_CREATE");
+                    }
+
                     break;
 
                 case RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE:
                     evaluator = getPolicyEvaluator(policyId);
 
                     if (evaluator == null) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Could not find evaluator for policy-id:[" + policyId + "]");
-                        }
+                        LOG.warn("Unexpected:  Did not find evaluator for policy-id:[" + policyId + "], changeType=CHANGE_TYPE_POLICY_UPDATE");
                     }
                     break;
 
                 case RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE:
                     evaluator = getPolicyEvaluator(policyId);
                     if (evaluator == null) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Could not find evaluator for policy-id:[" + policyId + "]");
-                        }
+                        LOG.warn("Unexpected:  Did not find evaluator for policy-id:[" + policyId + "], changeType=CHANGE_TYPE_POLICY_DELETE");
                     }
                     break;
 
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPolicyDeltaUtil.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPolicyDeltaUtil.java
index 4661f79..7088e83 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPolicyDeltaUtil.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerPolicyDeltaUtil.java
@@ -78,26 +78,49 @@ public class RangerPolicyDeltaUtil {
                     int changeType = delta.getChangeType();
 
                     if (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE || changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
-                        if (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE) {
-                            if (delta.getPolicy() != null) {
-                                ret.add(delta.getPolicy());
+                        Long policyId = delta.getPolicyId();
+
+                        if (policyId == null) {
+                            continue;
+                        }
+
+                        List<RangerPolicy>     deletedPolicies = new ArrayList<>();
+
+                        Iterator<RangerPolicy> iter = ret.iterator();
+
+                        while (iter.hasNext()) {
+                            RangerPolicy policy = iter.next();
+                            if (policyId.equals(policy.getId())) {
+                                deletedPolicies.add(policy);
+                                iter.remove();
                             }
-                        } else {
-                            // Either UPDATE or DELETE
-                            Long policyId = delta.getPolicyId();
-
-                            Iterator<RangerPolicy> iter = ret.iterator();
-                            while (iter.hasNext()) {
-                                if (policyId.equals(iter.next().getId())) {
-                                    iter.remove();
-                                    break;
+                        }
+
+                        switch(changeType) {
+                            case RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE: {
+                                if (CollectionUtils.isNotEmpty(deletedPolicies)) {
+                                    LOG.warn("Unexpected: found existing policy for CHANGE_TYPE_POLICY_CREATE: " + Arrays.toString(deletedPolicies.toArray()));
+                                }
+                                break;
+                            }
+                            case RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE: {
+                                if (CollectionUtils.isEmpty(deletedPolicies) || deletedPolicies.size() > 1) {
+                                    LOG.warn("Unexpected: found no policy or multiple policies for CHANGE_TYPE_POLICY_UPDATE: " + Arrays.toString(deletedPolicies.toArray()));
                                 }
+                                break;
                             }
-                            if (changeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE) {
-                                if (delta.getPolicy() != null) {
-                                    ret.add(delta.getPolicy());
+                            case RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE: {
+                                if (CollectionUtils.isEmpty(deletedPolicies) || deletedPolicies.size() > 1) {
+                                    LOG.warn("Unexpected: found no policy or multiple policies for CHANGE_TYPE_POLICY_DELETE: " + Arrays.toString(deletedPolicies.toArray()));
                                 }
+                                break;
                             }
+                            default:
+                                break;
+                        }
+
+                        if (changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
+                            ret.add(delta.getPolicy());
                         }
                     } else {
                         LOG.warn("Found unexpected changeType in policyDelta:[" + delta + "]. Ignoring delta");
@@ -111,7 +134,7 @@ public class RangerPolicyDeltaUtil {
             }
         } else {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("applyDeltas(deltas=null, serviceType=" + serviceType + ")");
+                LOG.warn("Unexpected : applyDeltas called with deltas=null");
             }
             ret = policies;
         }
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
index 4fb71f0..52e0c6f 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
@@ -2931,11 +2931,9 @@ public class ServiceDBStore extends AbstractServiceStore {
 		}
 
 		if (ret != null) {
-			ret.setPolicyVersion(serviceVersionInfoDbObj == null ? null : serviceVersionInfoDbObj.getPolicyVersion());
 			ret.setPolicyUpdateTime(serviceVersionInfoDbObj == null ? null : serviceVersionInfoDbObj.getPolicyUpdateTime());
 			ret.setAuditMode(auditMode);
 			if (ret.getTagPolicies() != null) {
-				ret.getTagPolicies().setPolicyVersion(tagServiceVersionInfoDbObj == null ? null : tagServiceVersionInfoDbObj.getPolicyVersion());
 				ret.getTagPolicies().setPolicyUpdateTime(tagServiceVersionInfoDbObj == null ? null : tagServiceVersionInfoDbObj.getPolicyUpdateTime());
 				ret.getTagPolicies().setAuditMode(auditMode);
 			}
@@ -3131,6 +3129,8 @@ public class ServiceDBStore extends AbstractServiceStore {
 
 			List<RangerPolicyDelta> resourcePolicyDeltas;
 			List<RangerPolicyDelta> tagPolicyDeltas = null;
+			Long                    retrievedPolicyVersion = null;
+			Long                    retrievedTagPolicyVersion = null;
 
 			String componentServiceType = serviceDef.getName();
 
@@ -3140,7 +3140,9 @@ public class ServiceDBStore extends AbstractServiceStore {
 			if (CollectionUtils.isNotEmpty(resourcePolicyDeltas)) {
 				isValid = RangerPolicyDeltaUtil.isValidDeltas(resourcePolicyDeltas, componentServiceType);
 
-				if (!isValid) {
+				if (isValid) {
+					retrievedPolicyVersion = resourcePolicyDeltas.get(resourcePolicyDeltas.size() - 1).getPoliciesVersion();
+				} else {
 					LOG.warn("Resource policy-Deltas :[" + resourcePolicyDeltas + "] from version :[" + lastKnownVersion + "] are not valid");
 				}
 
@@ -3154,7 +3156,9 @@ public class ServiceDBStore extends AbstractServiceStore {
 
 						isValid = RangerPolicyDeltaUtil.isValidDeltas(tagPolicyDeltas, tagServiceType);
 
-						if (!isValid) {
+						if (isValid) {
+							retrievedTagPolicyVersion = tagPolicyDeltas.get(tagPolicyDeltas.size() - 1).getPoliciesVersion();
+						} else {
 							LOG.warn("Tag policy-Deltas :[" + tagPolicyDeltas + "] for service-version :[" + lastKnownVersion + "] and delta-id :[" + id + "] are not valid");
 						}
 					}
@@ -3167,7 +3171,9 @@ public class ServiceDBStore extends AbstractServiceStore {
 
 						resourcePolicyDeltas.addAll(tagPolicyDeltas);
 					}
+
 					List<RangerPolicyDelta> compressedDeltas = compressDeltas(resourcePolicyDeltas);
+
 					if (compressedDeltas != null) {
 						ret = new ServicePolicies();
 						ret.setServiceId(service.getId());
@@ -3175,6 +3181,7 @@ public class ServiceDBStore extends AbstractServiceStore {
 						ret.setServiceDef(serviceDef);
 						ret.setPolicies(null);
 						ret.setPolicyDeltas(compressedDeltas);
+						ret.setPolicyVersion(retrievedPolicyVersion);
 
 						if (tagServiceDef != null && tagService != null) {
 							ServicePolicies.TagPolicies tagPolicies = new ServicePolicies.TagPolicies();
@@ -3182,6 +3189,7 @@ public class ServiceDBStore extends AbstractServiceStore {
 							tagPolicies.setServiceId(tagService.getId());
 							tagPolicies.setServiceName(tagService.getName());
 							tagPolicies.setPolicies(null);
+							tagPolicies.setPolicyVersion(retrievedTagPolicyVersion);
 							ret.setTagPolicies(tagPolicies);
 						}
 					} else {
diff --git a/security-admin/src/main/java/org/apache/ranger/common/RangerServicePoliciesCache.java b/security-admin/src/main/java/org/apache/ranger/common/RangerServicePoliciesCache.java
index 1176e0b..04aa472 100644
--- a/security-admin/src/main/java/org/apache/ranger/common/RangerServicePoliciesCache.java
+++ b/security-admin/src/main/java/org/apache/ranger/common/RangerServicePoliciesCache.java
@@ -35,6 +35,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
@@ -307,34 +308,28 @@ public class RangerServicePoliciesCache {
 						final List<RangerPolicy> policies = servicePolicies.getPolicies() == null ? new ArrayList<>() : servicePolicies.getPolicies();
 						final List<RangerPolicy> newPolicies = RangerPolicyDeltaUtil.applyDeltas(policies, servicePoliciesFromDb.getPolicyDeltas(), servicePolicies.getServiceDef().getName());
 
-						boolean isSanityCheckPassed = checkAndLoadCompleteSetOfPolicies(serviceName, serviceStore, newPolicies);
+						servicePolicies.setPolicies(newPolicies);
 
-						if (!isSanityCheckPassed) {
-							isCacheReloadedByDQEvent = true;
-						} else {
-							servicePolicies.setPolicies(newPolicies);
+						checkCacheSanity(serviceName, serviceStore, false);
 
-							// Rebuild tag-policies from original tag-policies and deltas
-							if (servicePoliciesFromDb.getTagPolicies() != null) {
-								if (LOG.isDebugEnabled()) {
-									LOG.debug("This service has associated tag service. Will compute tagPolicies from corresponding policy-deltas");
-								}
+						// Rebuild tag-policies from original tag-policies and deltas
+						if (servicePoliciesFromDb.getTagPolicies() != null) {
+							String tagServiceName = servicePoliciesFromDb.getTagPolicies().getServiceName();
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("This service has associated tag service:[" + tagServiceName + "]. Will compute tagPolicies from corresponding policy-deltas");
+							}
 
-								final List<RangerPolicy> tagPolicies = (servicePolicies.getTagPolicies() == null || CollectionUtils.isEmpty(servicePolicies.getTagPolicies().getPolicies())) ? new ArrayList<>() : servicePolicies.getTagPolicies().getPolicies();
-								final List<RangerPolicy> newTagPolicies = RangerPolicyDeltaUtil.applyDeltas(tagPolicies, servicePoliciesFromDb.getPolicyDeltas(), servicePoliciesFromDb.getTagPolicies().getServiceDef().getName());
+							final List<RangerPolicy> tagPolicies = (servicePolicies.getTagPolicies() == null || CollectionUtils.isEmpty(servicePolicies.getTagPolicies().getPolicies())) ? new ArrayList<>() : servicePolicies.getTagPolicies().getPolicies();
+							final List<RangerPolicy> newTagPolicies = RangerPolicyDeltaUtil.applyDeltas(tagPolicies, servicePoliciesFromDb.getPolicyDeltas(), servicePoliciesFromDb.getTagPolicies().getServiceDef().getName());
 
-								isSanityCheckPassed = checkAndLoadCompleteSetOfPolicies(serviceName, servicePoliciesFromDb.getTagPolicies().getServiceName(), serviceStore, newTagPolicies);
+							servicePolicies.getTagPolicies().setPolicies(newTagPolicies);
+							servicePolicies.getTagPolicies().setPolicyVersion(servicePoliciesFromDb.getTagPolicies().getPolicyVersion());
 
-								if (!isSanityCheckPassed) {
-									isCacheReloadedByDQEvent = true;
-								} else {
-									servicePolicies.getTagPolicies().setPolicies(newTagPolicies);
-								}
+							checkCacheSanity(servicePoliciesFromDb.getTagPolicies().getServiceName(), serviceStore, true);
 
-							} else {
-								if (LOG.isDebugEnabled()) {
-									LOG.debug("This service has no associated tag service");
-								}
+						} else {
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("This service has no associated tag service");
 							}
 						}
 					}
@@ -361,50 +356,15 @@ public class RangerServicePoliciesCache {
 			return isCacheReloadedByDQEvent;
 		}
 
-		private boolean checkAndLoadCompleteSetOfPolicies(String serviceName, ServiceStore serviceStore, List<RangerPolicy> policiesFromIncrementalComputation) {
-			boolean ret = checkCacheSanity(serviceName, serviceStore, policiesFromIncrementalComputation);
-
-			if (!ret) {
-				loadCompleteSetOfPolicies(serviceName, serviceStore);
-				LOG.warn("Policies loaded from db for service:" + serviceName + "] are [" + servicePolicies + "]");
-				LOG.warn("Policies in cache for service:[" + serviceName + "] are [" + policiesFromIncrementalComputation + "]");
-
-			}
-
-			return ret;
-		}
-
-		private boolean checkAndLoadCompleteSetOfPolicies(String serviceName, String tagServiceName, ServiceStore serviceStore, List<RangerPolicy> policiesFromIncrementalComputation) {
-			boolean ret = checkCacheSanity(tagServiceName, serviceStore, policiesFromIncrementalComputation);
-
-			if (!ret) {
-				loadCompleteSetOfPolicies(serviceName, serviceStore);
-				LOG.warn("Policies loaded from db for service:" + serviceName + "] are [" + servicePolicies + "]");
-				LOG.warn("Tag Policies in cache for tag-service:[" + tagServiceName + "] are [" + policiesFromIncrementalComputation + "]");
-			}
+		private void checkCacheSanity(String serviceName, ServiceStore serviceStore, boolean isTagService) {
+			final boolean result;
+			Long dbPolicyVersion = serviceStore.getServicePolicyVersion(serviceName);
+			Long cachedPolicyVersion = isTagService ? servicePolicies.getTagPolicies().getPolicyVersion() : servicePolicies.getPolicyVersion();
 
-			return ret;
-		}
+			result = Objects.equals(dbPolicyVersion, cachedPolicyVersion);
 
-		private boolean checkCacheSanity(String serviceName, ServiceStore serviceStore, List<RangerPolicy> policiesFromIncrementalComputation) {
-			final boolean ret;
-			long dbPoliciesCount = serviceStore.getPoliciesCount(serviceName);
-			long cachedPoliciesCount = (CollectionUtils.isEmpty(policiesFromIncrementalComputation) ? 0 : policiesFromIncrementalComputation.size());
-			ret = dbPoliciesCount == cachedPoliciesCount;
-			if (!ret) {
-				LOG.warn("For service:[" + serviceName + "]: dbPoliciesCount:[" + dbPoliciesCount + "], cachedPoliciesCount:[" + cachedPoliciesCount + "]");
-			}
-			return ret;
-
-		}
-
-		private void loadCompleteSetOfPolicies(String serviceName, ServiceStore serviceStore) {
-			LOG.warn("Something went wrong doing incremental policies for service:[" + serviceName + "]!! Loading all policies!!");
-			try {
-				servicePolicies = serviceStore.getServicePolicies(serviceName, -1L);
-				pruneUnusedAttributes();
-			} catch (Exception ex) {
-				LOG.warn("Could not get policies from database");
+			if (!result) {
+				LOG.info("checkCacheSanity(serviceName=" + serviceName + "): policy cache has a different version than one in the database. However, changes from " + cachedPolicyVersion + " to " + dbPolicyVersion + " will be downloaded in the next download. policyVersionInDB=" + dbPolicyVersion + ", policyVersionInCache=" + cachedPolicyVersion);
 			}
 		}
 
diff --git a/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java b/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java
index 0a1d1c1..477129d 100644
--- a/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java
+++ b/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java
@@ -134,6 +134,7 @@ public class XXPolicyChangeLogDao extends BaseDao<XXPolicyChangeLog> {
 
                 Long    logRecordId      = (Long) log[POLICY_CHANGE_LOG_RECORD_ID_COLUMN_NUMBER];
                 Integer policyChangeType = (Integer) log[POLICY_CHANGE_LOG_RECORD_CHANGE_TYPE_COLUMN_NUMBER];
+                Long    policiesVersion  = (Long) log[POLICY_CHANGE_LOG_RECORD_POLICY_VERSION_COLUMN_NUMBER];
                 String  serviceType      = (String) log[POLICY_CHANGE_LOG_RECORD_SERVICE_TYPE_COLUMN_NUMBER];
                 Long    policyId         = (Long) log[POLICY_CHANGE_LOG_RECORD_POLICY_ID_COLUMN_NUMBER];
 
@@ -145,30 +146,29 @@ public class XXPolicyChangeLogDao extends BaseDao<XXPolicyChangeLog> {
                         } catch (Exception e) {
                             LOG.error("Cannot read policy:[" + policyId + "]. Should not have come here!! Offending log-record-id:[" + logRecordId + "] and returning...", e);
                             ret.clear();
-                            ret.add(new RangerPolicyDelta(logRecordId, RangerPolicyDelta.CHANGE_TYPE_LOG_ERROR, null));
+                            ret.add(new RangerPolicyDelta(logRecordId, RangerPolicyDelta.CHANGE_TYPE_LOG_ERROR, null, null));
                             break;
                         }
                     } else {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Policy:[" + policyId + "] not found - log-record - id:[" + logRecordId + "], PolicyChangeType:[" + policyChangeType + "]");
+                        LOG.warn("Policy:[" + policyId + "] not found - log-record - id:[" + logRecordId + "], PolicyChangeType:[" + policyChangeType + "]");
+                        if (policyChangeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE || policyChangeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE) {
+                            LOG.warn("Ignoring POLICY_CREATE or POLICY_UPDATE type change for policy-id:[" + policyId + "] as it was not found.. probably already deleted");
+                            continue;
+                        } else {
+                            // policyChangeType is DELETE
+                            policy = new RangerPolicy();
+                            policy.setId(policyId);
+                            policy.setServiceType(serviceType);
+                            policy.setPolicyType((Integer) log[POLICY_CHANGE_LOG_RECORD_POLICY_TYPE_COLUMN_NUMBER]);
+                            policy.setZoneName((String) log[POLICY_CHANGE_LOG_RECORD_ZONE_NAME_COLUMN_NUMBER]);
                         }
-
-                        // Create a dummy policy as the policy cannot be found - probably already deleted
-                        policy = new RangerPolicy();
-                        policy.setId(policyId);
-                        policy.setVersion((Long) log[POLICY_CHANGE_LOG_RECORD_POLICY_VERSION_COLUMN_NUMBER]);
-                        policy.setPolicyType((Integer) log[POLICY_CHANGE_LOG_RECORD_POLICY_TYPE_COLUMN_NUMBER]);
-                        policy.setZoneName((String) log[POLICY_CHANGE_LOG_RECORD_ZONE_NAME_COLUMN_NUMBER]);
                     }
-                    policy.setServiceType(serviceType);
 
-                    ret.add(new RangerPolicyDelta(logRecordId, policyChangeType, policy));
+                    ret.add(new RangerPolicyDelta(logRecordId, policyChangeType, policiesVersion, policy));
                 } else {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("policyId is null! log-record-id:[" + logRecordId + ", service-type:[" + log[POLICY_CHANGE_LOG_RECORD_SERVICE_TYPE_COLUMN_NUMBER] + "], policy-change-type:[" + log[POLICY_CHANGE_LOG_RECORD_CHANGE_TYPE_COLUMN_NUMBER] + "]");
-                    }
+                    LOG.info("delta-reset-event: log-record-id=" + logRecordId + "; service-type=" + serviceType + "; policy-change-type=" + policyChangeType + ". Discarding " + ret.size() + " deltas");
                     ret.clear();
-                    ret.add(new RangerPolicyDelta(logRecordId, policyChangeType, null));
+                    ret.add(new RangerPolicyDelta(logRecordId, policyChangeType, null, null));
                     break;
                 }
             }