You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2015/09/10 01:59:13 UTC
[30/50] [abbrv] incubator-geode git commit: GEODE-222 Allow redis
adapter to handle live entry objects
GEODE-222 Allow redis adapter to handle live entry objects
Previously encoding a response would not be able to handle the case
where an entry was concurrently destroyed. The fix is to catch the
EntryDestroyedException gracefully
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d5ac2f97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d5ac2f97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d5ac2f97
Branch: refs/heads/feature/GEODE-12
Commit: d5ac2f97e97fbefe6efa710c6825f248f86ec975
Parents: e040750
Author: Vito Gavrilov <vg...@pivotal.io>
Authored: Thu Aug 27 10:54:44 2015 -0700
Committer: Vito Gavrilov <vg...@pivotal.io>
Committed: Thu Aug 27 10:54:44 2015 -0700
----------------------------------------------------------------------
.../gemstone/gemfire/internal/redis/Coder.java | 194 +++++++++++--------
1 file changed, 109 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d5ac2f97/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
index 9415cd3..0c35c93 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/redis/Coder.java
@@ -11,6 +11,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import com.gemstone.gemfire.cache.EntryDestroyedException;
import com.gemstone.gemfire.cache.query.Struct;
/**
@@ -168,29 +169,38 @@ public class Coder {
Iterator<Map.Entry<ByteArrayWrapper,ByteArrayWrapper>> it = items.iterator();
ByteBuf response = alloc.buffer();
response.writeByte(ARRAY_ID);
- response.writeBytes(intToBytes(items.size() * 2));
- response.writeBytes(CRLFar);
- try {
- while(it.hasNext()) {
- Map.Entry<ByteArrayWrapper,ByteArrayWrapper> next = it.next();
- byte[] key = next.getKey().toBytes();
- byte[] nextByteArray = next.getValue().toBytes();
- response.writeByte(BULK_STRING_ID); // Add key
- response.writeBytes(intToBytes(key.length));
- response.writeBytes(CRLFar);
- response.writeBytes(key);
- response.writeBytes(CRLFar);
- response.writeByte(BULK_STRING_ID); // Add value
- response.writeBytes(intToBytes(nextByteArray.length));
- response.writeBytes(CRLFar);
- response.writeBytes(nextByteArray);
- response.writeBytes(CRLFar);
+ int size = 0;
+ ByteBuf tmp = alloc.buffer();
+ while(it.hasNext()) {
+ Map.Entry<ByteArrayWrapper,ByteArrayWrapper> next = it.next();
+ byte[] key;
+ byte[] nextByteArray;
+ try {
+ key = next.getKey().toBytes();
+ nextByteArray = next.getValue().toBytes();
+ } catch (EntryDestroyedException e) {
+ continue;
}
- } catch(Exception e) {
- return null;
+ tmp.writeByte(BULK_STRING_ID); // Add key
+ tmp.writeBytes(intToBytes(key.length));
+ tmp.writeBytes(CRLFar);
+ tmp.writeBytes(key);
+ tmp.writeBytes(CRLFar);
+ tmp.writeByte(BULK_STRING_ID); // Add value
+ tmp.writeBytes(intToBytes(nextByteArray.length));
+ tmp.writeBytes(CRLFar);
+ tmp.writeBytes(nextByteArray);
+ tmp.writeBytes(CRLFar);
+ size++;
}
+ response.writeBytes(intToBytes(size*2));
+ response.writeBytes(CRLFar);
+ response.writeBytes(tmp);
+
+ tmp.release();
+
return response;
}
@@ -211,27 +221,23 @@ public class Coder {
response.writeBytes(intToBytes(items.size()));
response.writeBytes(CRLFar);
- try {
- while(it.hasNext()) {
- Object nextObject = it.next();
- if (nextObject instanceof String) {
- String next = (String) nextObject;
- response.writeByte(BULK_STRING_ID);
- response.writeBytes(intToBytes(next.length()));
- response.writeBytes(CRLFar);
- response.writeBytes(stringToBytes(next));
- response.writeBytes(CRLFar);
- } else if (nextObject instanceof ByteArrayWrapper) {
- byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
- response.writeByte(BULK_STRING_ID);
- response.writeBytes(intToBytes(next.length));
- response.writeBytes(CRLFar);
- response.writeBytes(next);
- response.writeBytes(CRLFar);
- }
+ while(it.hasNext()) {
+ Object nextObject = it.next();
+ if (nextObject instanceof String) {
+ String next = (String) nextObject;
+ response.writeByte(BULK_STRING_ID);
+ response.writeBytes(intToBytes(next.length()));
+ response.writeBytes(CRLFar);
+ response.writeBytes(stringToBytes(next));
+ response.writeBytes(CRLFar);
+ } else if (nextObject instanceof ByteArrayWrapper) {
+ byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
+ response.writeByte(BULK_STRING_ID);
+ response.writeBytes(intToBytes(next.length));
+ response.writeBytes(CRLFar);
+ response.writeBytes(next);
+ response.writeBytes(CRLFar);
}
- } catch (Exception e) {
- return null;
}
return response;
}
@@ -260,7 +266,7 @@ public class Coder {
response.writeBytes(CRLFar);
return response;
}
-
+
public static final ByteBuf getNoAuthResponse(ByteBufAllocator alloc, String error) {
byte[] errorAr = stringToBytes(error);
ByteBuf response = alloc.buffer(errorAr.length + 25);
@@ -306,26 +312,38 @@ public class Coder {
Iterator<?> it = items.iterator();
ByteBuf response = alloc.buffer();
response.writeByte(Coder.ARRAY_ID);
- response.writeBytes(intToBytes(items.size()));
- response.writeBytes(Coder.CRLFar);
-
+ ByteBuf tmp = alloc.buffer();
+ int size = 0;
while(it.hasNext()) {
Object next = it.next();
ByteArrayWrapper nextWrapper = null;
- if (next instanceof Entry)
- nextWrapper = (ByteArrayWrapper) ((Entry<?, ?>) next).getValue();
- else if (next instanceof Struct)
+ if (next instanceof Entry) {
+ try {
+ nextWrapper = (ByteArrayWrapper) ((Entry<?, ?>) next).getValue();
+ } catch (EntryDestroyedException e) {
+ continue;
+ }
+ } else if (next instanceof Struct) {
nextWrapper = (ByteArrayWrapper) ((Struct) next).getFieldValues()[1];
+ }
if (nextWrapper != null) {
- response.writeByte(Coder.BULK_STRING_ID);
- response.writeBytes(intToBytes(nextWrapper.length()));
- response.writeBytes(Coder.CRLFar);
- response.writeBytes(nextWrapper.toBytes());
- response.writeBytes(Coder.CRLFar);
+ tmp.writeByte(Coder.BULK_STRING_ID);
+ tmp.writeBytes(intToBytes(nextWrapper.length()));
+ tmp.writeBytes(Coder.CRLFar);
+ tmp.writeBytes(nextWrapper.toBytes());
+ tmp.writeBytes(Coder.CRLFar);
} else {
- response.writeBytes(Coder.bNIL);
+ tmp.writeBytes(Coder.bNIL);
}
+ size++;
}
+
+ response.writeBytes(intToBytes(size));
+ response.writeBytes(Coder.CRLFar);
+ response.writeBytes(tmp);
+
+ tmp.release();
+
return response;
}
@@ -335,43 +353,49 @@ public class Coder {
ByteBuf buffer = alloc.buffer();
buffer.writeByte(Coder.ARRAY_ID);
- if (!withScores)
- buffer.writeBytes(intToBytes(list.size()));
- else
- buffer.writeBytes(intToBytes(2 * list.size()));
- buffer.writeBytes(Coder.CRLFar);
-
- try {
- for(Object entry: list) {
- ByteArrayWrapper key;
- DoubleWrapper score;
- if (entry instanceof Entry) {
+ ByteBuf tmp = alloc.buffer();
+ int size = 0;
+
+ for(Object entry: list) {
+ ByteArrayWrapper key;
+ DoubleWrapper score;
+ if (entry instanceof Entry) {
+ try {
key = (ByteArrayWrapper) ((Entry<?, ?>) entry).getKey();
- score = (DoubleWrapper) ((Entry<?, ?>) entry).getValue();;
- } else {
- Object[] fieldVals = ((Struct) entry).getFieldValues();
- key = (ByteArrayWrapper) fieldVals[0];
- score = (DoubleWrapper) fieldVals[1];
- }
- byte[] byteAr = key.toBytes();
- buffer.writeByte(Coder.BULK_STRING_ID);
- buffer.writeBytes(intToBytes(byteAr.length));
- buffer.writeBytes(Coder.CRLFar);
- buffer.writeBytes(byteAr);
- buffer.writeBytes(Coder.CRLFar);
- if (withScores) {
- String scoreString = score.toString();
- byte[] scoreAr = stringToBytes(scoreString);
- buffer.writeByte(Coder.BULK_STRING_ID);
- buffer.writeBytes(intToBytes(scoreString.length()));
- buffer.writeBytes(Coder.CRLFar);
- buffer.writeBytes(scoreAr);
- buffer.writeBytes(Coder.CRLFar);
+ score = (DoubleWrapper) ((Entry<?, ?>) entry).getValue();
+ } catch (EntryDestroyedException e) {
+ continue;
}
+ } else {
+ Object[] fieldVals = ((Struct) entry).getFieldValues();
+ key = (ByteArrayWrapper) fieldVals[0];
+ score = (DoubleWrapper) fieldVals[1];
+ }
+ byte[] byteAr = key.toBytes();
+ tmp.writeByte(Coder.BULK_STRING_ID);
+ tmp.writeBytes(intToBytes(byteAr.length));
+ tmp.writeBytes(Coder.CRLFar);
+ tmp.writeBytes(byteAr);
+ tmp.writeBytes(Coder.CRLFar);
+ size++;
+ if (withScores) {
+ String scoreString = score.toString();
+ byte[] scoreAr = stringToBytes(scoreString);
+ tmp.writeByte(Coder.BULK_STRING_ID);
+ tmp.writeBytes(intToBytes(scoreString.length()));
+ tmp.writeBytes(Coder.CRLFar);
+ tmp.writeBytes(scoreAr);
+ tmp.writeBytes(Coder.CRLFar);
+ size++;
}
- } catch(Exception e) {
- return null;
}
+
+ buffer.writeBytes(intToBytes(size));
+ buffer.writeBytes(Coder.CRLFar);
+ buffer.writeBytes(tmp);
+
+ tmp.release();
+
return buffer;
}