You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2015/09/10 01:59:13 UTC

[30/50] [abbrv] incubator-geode git commit: GEODE-222 Allow redis adapter to handle live entry objects

GEODE-222 Allow redis adapter to handle live entry objects

Previously encoding a response would not be able to handle the case
where an entry was concurrently destroyed. The fix is to catch the
EntryDestroyedException gracefully


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d5ac2f97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d5ac2f97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d5ac2f97

Branch: refs/heads/feature/GEODE-12
Commit: d5ac2f97e97fbefe6efa710c6825f248f86ec975
Parents: e040750
Author: Vito Gavrilov <vg...@pivotal.io>
Authored: Thu Aug 27 10:54:44 2015 -0700
Committer: Vito Gavrilov <vg...@pivotal.io>
Committed: Thu Aug 27 10:54:44 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/internal/redis/Coder.java  | 194 +++++++++++--------
 1 file changed, 109 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d5ac2f97/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
index 9415cd3..0c35c93 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
@@ -11,6 +11,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import com.gemstone.gemfire.cache.EntryDestroyedException;
 import com.gemstone.gemfire.cache.query.Struct;
 
 /**
@@ -168,29 +169,38 @@ public class Coder {
     Iterator<Map.Entry<ByteArrayWrapper,ByteArrayWrapper>> it = items.iterator();
     ByteBuf response = alloc.buffer();
     response.writeByte(ARRAY_ID);
-    response.writeBytes(intToBytes(items.size() * 2));
-    response.writeBytes(CRLFar);
 
-    try {
-      while(it.hasNext()) {
-        Map.Entry<ByteArrayWrapper,ByteArrayWrapper> next = it.next();
-        byte[] key = next.getKey().toBytes();
-        byte[] nextByteArray = next.getValue().toBytes();
-        response.writeByte(BULK_STRING_ID); // Add key
-        response.writeBytes(intToBytes(key.length));
-        response.writeBytes(CRLFar);
-        response.writeBytes(key);
-        response.writeBytes(CRLFar);
-        response.writeByte(BULK_STRING_ID); // Add value
-        response.writeBytes(intToBytes(nextByteArray.length));
-        response.writeBytes(CRLFar);
-        response.writeBytes(nextByteArray);
-        response.writeBytes(CRLFar);
+    int size = 0;
+    ByteBuf tmp = alloc.buffer();
+    while(it.hasNext()) {
+      Map.Entry<ByteArrayWrapper,ByteArrayWrapper> next = it.next();
+      byte[] key;
+      byte[] nextByteArray;
+      try {
+        key = next.getKey().toBytes();
+        nextByteArray = next.getValue().toBytes();
+      } catch (EntryDestroyedException e) {
+        continue;
       }
-    } catch(Exception e) {
-      return null;
+      tmp.writeByte(BULK_STRING_ID); // Add key
+      tmp.writeBytes(intToBytes(key.length));
+      tmp.writeBytes(CRLFar);
+      tmp.writeBytes(key);
+      tmp.writeBytes(CRLFar);
+      tmp.writeByte(BULK_STRING_ID); // Add value
+      tmp.writeBytes(intToBytes(nextByteArray.length));
+      tmp.writeBytes(CRLFar);
+      tmp.writeBytes(nextByteArray);
+      tmp.writeBytes(CRLFar);
+      size++;
     }
 
+    response.writeBytes(intToBytes(size*2));
+    response.writeBytes(CRLFar);
+    response.writeBytes(tmp);
+
+    tmp.release();
+
     return response;
   }
 
@@ -211,27 +221,23 @@ public class Coder {
     response.writeBytes(intToBytes(items.size()));
     response.writeBytes(CRLFar);
 
-    try {
-      while(it.hasNext()) {
-        Object nextObject = it.next();
-        if (nextObject instanceof String) {
-          String next = (String) nextObject;
-          response.writeByte(BULK_STRING_ID);
-          response.writeBytes(intToBytes(next.length()));
-          response.writeBytes(CRLFar);
-          response.writeBytes(stringToBytes(next));
-          response.writeBytes(CRLFar);
-        } else if (nextObject instanceof ByteArrayWrapper) {
-          byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
-          response.writeByte(BULK_STRING_ID);
-          response.writeBytes(intToBytes(next.length));
-          response.writeBytes(CRLFar);
-          response.writeBytes(next);
-          response.writeBytes(CRLFar);
-        }
+    while(it.hasNext()) {
+      Object nextObject = it.next();
+      if (nextObject instanceof String) {
+        String next = (String) nextObject;
+        response.writeByte(BULK_STRING_ID);
+        response.writeBytes(intToBytes(next.length()));
+        response.writeBytes(CRLFar);
+        response.writeBytes(stringToBytes(next));
+        response.writeBytes(CRLFar);
+      } else if (nextObject instanceof ByteArrayWrapper) {
+        byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
+        response.writeByte(BULK_STRING_ID);
+        response.writeBytes(intToBytes(next.length));
+        response.writeBytes(CRLFar);
+        response.writeBytes(next);
+        response.writeBytes(CRLFar);
       }
-    } catch (Exception e) {
-      return null;
     }
     return response;
   }
@@ -260,7 +266,7 @@ public class Coder {
     response.writeBytes(CRLFar);
     return response;
   }
-  
+
   public static final ByteBuf getNoAuthResponse(ByteBufAllocator alloc, String error) {
     byte[] errorAr = stringToBytes(error);
     ByteBuf response = alloc.buffer(errorAr.length + 25);
@@ -306,26 +312,38 @@ public class Coder {
     Iterator<?> it = items.iterator();
     ByteBuf response = alloc.buffer();
     response.writeByte(Coder.ARRAY_ID);
-    response.writeBytes(intToBytes(items.size()));
-    response.writeBytes(Coder.CRLFar);
-
+    ByteBuf tmp = alloc.buffer();
+    int size = 0;
     while(it.hasNext()) {
       Object next = it.next();
       ByteArrayWrapper nextWrapper = null;
-      if (next instanceof Entry)
-        nextWrapper = (ByteArrayWrapper) ((Entry<?, ?>) next).getValue();
-      else if (next instanceof Struct)
+      if (next instanceof Entry) {
+        try {
+          nextWrapper = (ByteArrayWrapper) ((Entry<?, ?>) next).getValue();
+        } catch (EntryDestroyedException e) {
+          continue;
+        }
+      } else if (next instanceof Struct) {
         nextWrapper = (ByteArrayWrapper) ((Struct) next).getFieldValues()[1];
+      }
       if (nextWrapper != null) {
-        response.writeByte(Coder.BULK_STRING_ID);
-        response.writeBytes(intToBytes(nextWrapper.length()));
-        response.writeBytes(Coder.CRLFar);
-        response.writeBytes(nextWrapper.toBytes());
-        response.writeBytes(Coder.CRLFar);
+        tmp.writeByte(Coder.BULK_STRING_ID);
+        tmp.writeBytes(intToBytes(nextWrapper.length()));
+        tmp.writeBytes(Coder.CRLFar);
+        tmp.writeBytes(nextWrapper.toBytes());
+        tmp.writeBytes(Coder.CRLFar);
       } else {
-        response.writeBytes(Coder.bNIL);
+        tmp.writeBytes(Coder.bNIL);
       }
+      size++;
     }
+
+    response.writeBytes(intToBytes(size));
+    response.writeBytes(Coder.CRLFar);
+    response.writeBytes(tmp);
+
+    tmp.release();
+
     return response;
   }
 
@@ -335,43 +353,49 @@ public class Coder {
 
     ByteBuf buffer = alloc.buffer();
     buffer.writeByte(Coder.ARRAY_ID);
-    if (!withScores)
-      buffer.writeBytes(intToBytes(list.size()));
-    else
-      buffer.writeBytes(intToBytes(2 * list.size()));
-    buffer.writeBytes(Coder.CRLFar);
-
-    try {
-      for(Object entry: list) {
-        ByteArrayWrapper key;
-        DoubleWrapper score;
-        if (entry instanceof Entry) {
+    ByteBuf tmp = alloc.buffer();
+    int size = 0;
+
+    for(Object entry: list) {
+      ByteArrayWrapper key;
+      DoubleWrapper score;
+      if (entry instanceof Entry) {
+        try {
           key = (ByteArrayWrapper) ((Entry<?, ?>) entry).getKey();
-          score = (DoubleWrapper) ((Entry<?, ?>) entry).getValue();;
-        } else {
-          Object[] fieldVals = ((Struct) entry).getFieldValues();
-          key = (ByteArrayWrapper) fieldVals[0];
-          score = (DoubleWrapper) fieldVals[1];
-        }
-        byte[] byteAr = key.toBytes();
-        buffer.writeByte(Coder.BULK_STRING_ID);
-        buffer.writeBytes(intToBytes(byteAr.length));
-        buffer.writeBytes(Coder.CRLFar);
-        buffer.writeBytes(byteAr);
-        buffer.writeBytes(Coder.CRLFar);
-        if (withScores) {
-          String scoreString = score.toString();
-          byte[] scoreAr = stringToBytes(scoreString);
-          buffer.writeByte(Coder.BULK_STRING_ID);
-          buffer.writeBytes(intToBytes(scoreString.length()));
-          buffer.writeBytes(Coder.CRLFar);
-          buffer.writeBytes(scoreAr);
-          buffer.writeBytes(Coder.CRLFar);
+          score = (DoubleWrapper) ((Entry<?, ?>) entry).getValue();
+        } catch (EntryDestroyedException e) {
+          continue;
         }
+      } else {
+        Object[] fieldVals = ((Struct) entry).getFieldValues();
+        key = (ByteArrayWrapper) fieldVals[0];
+        score = (DoubleWrapper) fieldVals[1];
+      }
+      byte[] byteAr = key.toBytes();
+      tmp.writeByte(Coder.BULK_STRING_ID);
+      tmp.writeBytes(intToBytes(byteAr.length));
+      tmp.writeBytes(Coder.CRLFar);
+      tmp.writeBytes(byteAr);
+      tmp.writeBytes(Coder.CRLFar);
+      size++;
+      if (withScores) {
+        String scoreString = score.toString();
+        byte[] scoreAr = stringToBytes(scoreString);
+        tmp.writeByte(Coder.BULK_STRING_ID);
+        tmp.writeBytes(intToBytes(scoreString.length()));
+        tmp.writeBytes(Coder.CRLFar);
+        tmp.writeBytes(scoreAr);
+        tmp.writeBytes(Coder.CRLFar);
+        size++;
       }
-    } catch(Exception e) {
-      return null;
     }
+
+    buffer.writeBytes(intToBytes(size));
+    buffer.writeBytes(Coder.CRLFar);
+    buffer.writeBytes(tmp);
+
+    tmp.release();
+
     return buffer;
   }