You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/03/11 02:35:45 UTC

[GitHub] [geode] nabarunnag commented on a change in pull request #6104: GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type

nabarunnag commented on a change in pull request #6104:
URL: https://github.com/apache/geode/pull/6104#discussion_r592022618



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
##########
@@ -814,6 +817,23 @@ protected FilterRoutingInfo getRecipientFilterRouting(Set cacheOpRecipients) {
     return consolidated;
   }
 
+  @Override
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry from CQ cache.
+      for (int i = 0; i < this.putAllData.length; i++) {
+        @Unretained
+        EntryEventImpl entryEvent = getEventForPosition(i);
+        if (entryEvent != null && entryEvent.getKey() != null && cq != null
+            && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+            && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+          cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+        }
+      }
+    }
+  }

Review comment:
       ```suggestion
     void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq, int length) {
       for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
         Long cqID = e.getKey();
         // For the CQs satisfying the event with destroy CQEvent, remove
         // the entry from CQ cache.
         for (int i = 0; i < length; i++) {
           @Unretained
           EntryEventImpl entryEvent = getEventForPosition(i);
           if (entryEvent != null && entryEvent.getKey() != null && cq != null
               && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
               && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
             cq.removeFromCqResultKeys(entryEvent.getKey(), true);
           }
         }
       }
     }
   ```

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
##########
@@ -814,6 +817,23 @@ protected FilterRoutingInfo getRecipientFilterRouting(Set cacheOpRecipients) {
     return consolidated;
   }
 
+  @Override
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry from CQ cache.
+      for (int i = 0; i < this.putAllData.length; i++) {
+        @Unretained
+        EntryEventImpl entryEvent = getEventForPosition(i);
+        if (entryEvent != null && entryEvent.getKey() != null && cq != null
+            && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+            && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+          cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+        }
+      }
+    }
+  }

Review comment:
       Maybe this method can be put in a common class and reused in DistributedRemoveAllOperation and DistributedPutAllOperations




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org