You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/02 15:35:28 UTC
cassandra git commit: Fix custom payload encoding decoding to match
protocol spec
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 a30d8bd21 -> 6f93bd1f6
Fix custom payload encoding decoding to match protocol spec
patch by blerer; reviewed by slebresne for CASSANDRA-9515
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f93bd1f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f93bd1f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f93bd1f
Branch: refs/heads/cassandra-2.2
Commit: 6f93bd1f65888104e33da2f9f01056b6115952e5
Parents: a30d8bd
Author: Benjamin Lerer <be...@datastax.com>
Authored: Tue Jun 2 15:34:31 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 2 15:34:31 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../CustomPayloadMirroringQueryHandler.java | 18 +++--
.../org/apache/cassandra/cql3/QueryHandler.java | 20 ++++--
.../apache/cassandra/cql3/QueryProcessor.java | 22 ++++--
.../org/apache/cassandra/transport/CBUtil.java | 18 ++---
.../org/apache/cassandra/transport/Message.java | 13 ++--
.../cassandra/transport/MessagePayloadTest.java | 73 ++++++++++++--------
7 files changed, 110 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7f0ef51..db94c76 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2
+ * Fix custom payload coding/decoding to match the spec (CASSANDRA-9515)
* ant test-all results incomplete when parsed (CASSANDRA-9463)
* Disallow frozen<> types in function arguments and return types for
clarity (CASSANDRA-9411)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
index 3930e9c..02a6df9 100644
--- a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.cql3;
+import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.cassandra.cql3.statements.BatchStatement;
@@ -34,14 +35,17 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler
{
static QueryProcessor queryProcessor = QueryProcessor.instance;
- public ResultMessage process(String query, QueryState state, QueryOptions options, Map<String, byte[]> customPayload)
+ public ResultMessage process(String query,
+ QueryState state,
+ QueryOptions options,
+ Map<String, ByteBuffer> customPayload)
{
ResultMessage result = queryProcessor.process(query, state, options, customPayload);
result.setCustomPayload(customPayload);
return result;
}
- public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, byte[]> customPayload)
+ public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, ByteBuffer> customPayload)
{
ResultMessage.Prepared prepared = queryProcessor.prepare(query, state, customPayload);
prepared.setCustomPayload(customPayload);
@@ -58,14 +62,20 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler
return queryProcessor.getPreparedForThrift(id);
}
- public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, Map<String, byte[]> customPayload)
+ public ResultMessage processPrepared(CQLStatement statement,
+ QueryState state,
+ QueryOptions options,
+ Map<String, ByteBuffer> customPayload)
{
ResultMessage result = queryProcessor.processPrepared(statement, state, options, customPayload);
result.setCustomPayload(customPayload);
return result;
}
- public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, Map<String, byte[]> customPayload)
+ public ResultMessage processBatch(BatchStatement statement,
+ QueryState state,
+ BatchQueryOptions options,
+ Map<String, ByteBuffer> customPayload)
{
ResultMessage result = queryProcessor.processBatch(statement, state, options, customPayload);
result.setCustomPayload(customPayload);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/cql3/QueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java
index 8b579d7..3c11c0e 100644
--- a/src/java/org/apache/cassandra/cql3/QueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.cql3;
+import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.cassandra.cql3.statements.BatchStatement;
@@ -29,15 +30,26 @@ import org.apache.cassandra.utils.MD5Digest;
public interface QueryHandler
{
- ResultMessage process(String query, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException;
+ ResultMessage process(String query,
+ QueryState state,
+ QueryOptions options,
+ Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException;
- ResultMessage.Prepared prepare(String query, QueryState state, Map<String, byte[]> customPayload) throws RequestValidationException;
+ ResultMessage.Prepared prepare(String query,
+ QueryState state,
+ Map<String, ByteBuffer> customPayload) throws RequestValidationException;
ParsedStatement.Prepared getPrepared(MD5Digest id);
ParsedStatement.Prepared getPreparedForThrift(Integer id);
- ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException;
+ ResultMessage processPrepared(CQLStatement statement,
+ QueryState state,
+ QueryOptions options,
+ Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException;
- ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException;
+ ResultMessage processBatch(BatchStatement statement,
+ QueryState state,
+ BatchQueryOptions options,
+ Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 7b9261c..3170932 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -232,7 +232,11 @@ public class QueryProcessor implements QueryHandler
return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
}
- public ResultMessage process(String query, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+ public ResultMessage process(String query,
+ QueryState state,
+ QueryOptions options,
+ Map<String, ByteBuffer> customPayload)
+ throws RequestExecutionException, RequestValidationException
{
return process(query, state, options);
}
@@ -342,7 +346,9 @@ public class QueryProcessor implements QueryHandler
return UntypedResultSet.create(cqlRows);
}
- public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, byte[]> customPayload) throws RequestValidationException
+ public ResultMessage.Prepared prepare(String query,
+ QueryState state,
+ Map<String, ByteBuffer> customPayload) throws RequestValidationException
{
return prepare(query, state);
}
@@ -422,7 +428,11 @@ public class QueryProcessor implements QueryHandler
}
}
- public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+ public ResultMessage processPrepared(CQLStatement statement,
+ QueryState state,
+ QueryOptions options,
+ Map<String, ByteBuffer> customPayload)
+ throws RequestExecutionException, RequestValidationException
{
return processPrepared(statement, state, options);
}
@@ -450,7 +460,11 @@ public class QueryProcessor implements QueryHandler
return processStatement(statement, queryState, options);
}
- public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+ public ResultMessage processBatch(BatchStatement statement,
+ QueryState state,
+ BatchQueryOptions options,
+ Map<String, ByteBuffer> customPayload)
+ throws RequestExecutionException, RequestValidationException
{
return processBatch(statement, state, options);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 48beea0..92e2891 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -183,36 +183,36 @@ public abstract class CBUtil
return 2 + bytes.length;
}
- public static Map<String, byte[]> readBytesMap(ByteBuf cb)
+ public static Map<String, ByteBuffer> readBytesMap(ByteBuf cb)
{
int length = cb.readUnsignedShort();
- Map<String, byte[]> m = new HashMap<>(length);
+ Map<String, ByteBuffer> m = new HashMap<>(length);
for (int i = 0; i < length; i++)
{
String k = readString(cb);
- byte[] v = readBytes(cb);
+ ByteBuffer v = readValue(cb);
m.put(k, v);
}
return m;
}
- public static void writeBytesMap(Map<String, byte[]> m, ByteBuf cb)
+ public static void writeBytesMap(Map<String, ByteBuffer> m, ByteBuf cb)
{
cb.writeShort(m.size());
- for (Map.Entry<String, byte[]> entry : m.entrySet())
+ for (Map.Entry<String, ByteBuffer> entry : m.entrySet())
{
writeString(entry.getKey(), cb);
- writeBytes(entry.getValue(), cb);
+ writeValue(entry.getValue(), cb);
}
}
- public static int sizeOfBytesMap(Map<String, byte[]> m)
+ public static int sizeOfBytesMap(Map<String, ByteBuffer> m)
{
int size = 2;
- for (Map.Entry<String, byte[]> entry : m.entrySet())
+ for (Map.Entry<String, ByteBuffer> entry : m.entrySet())
{
size += sizeOfString(entry.getKey());
- size += sizeOfBytes(entry.getValue());
+ size += sizeOfValue(entry.getValue());
}
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index b6d5a95..440d481 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.transport;
import java.util.ArrayList;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@@ -148,7 +149,7 @@ public abstract class Message
protected Connection connection;
private int streamId;
private Frame sourceFrame;
- private Map<String, byte[]> customPayload;
+ private Map<String, ByteBuffer> customPayload;
protected Message(Type type)
{
@@ -186,12 +187,12 @@ public abstract class Message
return sourceFrame;
}
- public Map<String, byte[]> getCustomPayload()
+ public Map<String, ByteBuffer> getCustomPayload()
{
return customPayload;
}
- public void setCustomPayload(Map<String, byte[]> customPayload)
+ public void setCustomPayload(Map<String, ByteBuffer> customPayload)
{
this.customPayload = customPayload;
}
@@ -269,7 +270,7 @@ public abstract class Message
UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUUID(frame.body);
List<String> warnings = isRequest || !hasWarning ? null : CBUtil.readStringList(frame.body);
- Map<String, byte[]> customPayload = !isCustomPayload ? null : CBUtil.readBytesMap(frame.body);
+ Map<String, ByteBuffer> customPayload = !isCustomPayload ? null : CBUtil.readBytesMap(frame.body);
try
{
@@ -329,7 +330,7 @@ public abstract class Message
if (message instanceof Response)
{
UUID tracingId = ((Response)message).getTracingId();
- Map<String, byte[]> customPayload = message.getCustomPayload();
+ Map<String, ByteBuffer> customPayload = message.getCustomPayload();
if (tracingId != null)
messageSize += CBUtil.sizeOfUUID(tracingId);
List<String> warnings = ((Response)message).getWarnings();
@@ -367,7 +368,7 @@ public abstract class Message
assert message instanceof Request;
if (((Request)message).isTracingRequested())
flags.add(Frame.Header.Flag.TRACING);
- Map<String, byte[]> payload = message.getCustomPayload();
+ Map<String, ByteBuffer> payload = message.getCustomPayload();
if (payload != null)
messageSize += CBUtil.sizeOfBytesMap(payload);
body = CBUtil.allocator.buffer(messageSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 1049d63..73daa48 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -48,10 +48,12 @@ import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.MD5Digest;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
public class MessagePayloadTest extends CQLTester
{
- public static Map<String, byte[]> requestPayload;
- public static Map<String, byte[]> responsePayload;
+ public static Map<String, ByteBuffer> requestPayload;
+ public static Map<String, ByteBuffer> responsePayload;
private static Field cqlQueryHandlerField;
private static boolean modifiersAccessible;
@@ -125,8 +127,8 @@ public class MessagePayloadTest extends CQLTester
{
client.connect(false);
- Map<String, byte[]> reqMap;
- Map<String, byte[]> respMap;
+ Map<String, ByteBuffer> reqMap;
+ Map<String, ByteBuffer> respMap;
QueryMessage queryMessage = new QueryMessage(
"CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)",
@@ -134,23 +136,23 @@ public class MessagePayloadTest extends CQLTester
);
PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable");
- reqMap = Collections.singletonMap("foo", "42".getBytes());
- responsePayload = respMap = Collections.singletonMap("bar", "42".getBytes());
+ reqMap = Collections.singletonMap("foo", bytes(42));
+ responsePayload = respMap = Collections.singletonMap("bar", bytes(42));
queryMessage.setCustomPayload(reqMap);
Message.Response queryResponse = client.execute(queryMessage);
payloadEquals(reqMap, requestPayload);
payloadEquals(respMap, queryResponse.getCustomPayload());
- reqMap = Collections.singletonMap("foo", "43".getBytes());
- responsePayload = respMap = Collections.singletonMap("bar", "43".getBytes());
+ reqMap = Collections.singletonMap("foo", bytes(43));
+ responsePayload = respMap = Collections.singletonMap("bar", bytes(43));
prepareMessage.setCustomPayload(reqMap);
ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage);
payloadEquals(reqMap, requestPayload);
payloadEquals(respMap, prepareResponse.getCustomPayload());
ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
- reqMap = Collections.singletonMap("foo", "44".getBytes());
- responsePayload = respMap = Collections.singletonMap("bar", "44".getBytes());
+ reqMap = Collections.singletonMap("foo", bytes(44));
+ responsePayload = respMap = Collections.singletonMap("bar", bytes(44));
executeMessage.setCustomPayload(reqMap);
Message.Response executeResponse = client.execute(executeMessage);
payloadEquals(reqMap, requestPayload);
@@ -160,8 +162,8 @@ public class MessagePayloadTest extends CQLTester
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v) VALUES (1, 'foo')"),
Collections.singletonList(Collections.<ByteBuffer>emptyList()),
QueryOptions.DEFAULT);
- reqMap = Collections.singletonMap("foo", "45".getBytes());
- responsePayload = respMap = Collections.singletonMap("bar", "45".getBytes());
+ reqMap = Collections.singletonMap("foo", bytes(45));
+ responsePayload = respMap = Collections.singletonMap("bar", bytes(45));
batchMessage.setCustomPayload(reqMap);
Message.Response batchResponse = client.execute(batchMessage);
payloadEquals(reqMap, requestPayload);
@@ -194,7 +196,7 @@ public class MessagePayloadTest extends CQLTester
{
client.connect(false);
- Map<String, byte[]> reqMap;
+ Map<String, ByteBuffer> reqMap;
QueryMessage queryMessage = new QueryMessage(
"CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)",
@@ -202,8 +204,8 @@ public class MessagePayloadTest extends CQLTester
);
PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable");
- reqMap = Collections.singletonMap("foo", "42".getBytes());
- responsePayload = Collections.singletonMap("bar", "42".getBytes());
+ reqMap = Collections.singletonMap("foo", bytes(42));
+ responsePayload = Collections.singletonMap("bar", bytes(42));
queryMessage.setCustomPayload(reqMap);
try
{
@@ -217,8 +219,8 @@ public class MessagePayloadTest extends CQLTester
queryMessage.setCustomPayload(null);
client.execute(queryMessage);
- reqMap = Collections.singletonMap("foo", "43".getBytes());
- responsePayload = Collections.singletonMap("bar", "43".getBytes());
+ reqMap = Collections.singletonMap("foo", bytes(43));
+ responsePayload = Collections.singletonMap("bar", bytes(43));
prepareMessage.setCustomPayload(reqMap);
try
{
@@ -233,8 +235,8 @@ public class MessagePayloadTest extends CQLTester
ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage);
ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
- reqMap = Collections.singletonMap("foo", "44".getBytes());
- responsePayload = Collections.singletonMap("bar", "44".getBytes());
+ reqMap = Collections.singletonMap("foo", bytes(44));
+ responsePayload = Collections.singletonMap("bar", bytes(44));
executeMessage.setCustomPayload(reqMap);
try
{
@@ -250,8 +252,8 @@ public class MessagePayloadTest extends CQLTester
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v) VALUES (1, 'foo')"),
Collections.singletonList(Collections.<ByteBuffer>emptyList()),
QueryOptions.DEFAULT);
- reqMap = Collections.singletonMap("foo", "45".getBytes());
- responsePayload = Collections.singletonMap("bar", "45".getBytes());
+ reqMap = Collections.singletonMap("foo", bytes(45));
+ responsePayload = Collections.singletonMap("bar", bytes(45));
batchMessage.setCustomPayload(reqMap);
try
{
@@ -274,13 +276,13 @@ public class MessagePayloadTest extends CQLTester
}
}
- private static void payloadEquals(Map<String, byte[]> map1, Map<String, byte[]> map2)
+ private static void payloadEquals(Map<String, ByteBuffer> map1, Map<String, ByteBuffer> map2)
{
Assert.assertNotNull(map1);
Assert.assertNotNull(map2);
Assert.assertEquals(map1.keySet(), map2.keySet());
- for (Map.Entry<String, byte[]> e : map1.entrySet())
- Assert.assertArrayEquals(e.getValue(), map2.get(e.getKey()));
+ for (Map.Entry<String, ByteBuffer> e : map1.entrySet())
+ Assert.assertEquals(e.getValue(), map2.get(e.getKey()));
}
public static class TestQueryHandler implements QueryHandler
@@ -295,7 +297,10 @@ public class MessagePayloadTest extends CQLTester
return QueryProcessor.instance.getPreparedForThrift(id);
}
- public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, byte[]> customPayload) throws RequestValidationException
+ public ResultMessage.Prepared prepare(String query,
+ QueryState state,
+ Map<String, ByteBuffer> customPayload)
+ throws RequestValidationException
{
if (customPayload != null)
requestPayload = customPayload;
@@ -308,7 +313,11 @@ public class MessagePayloadTest extends CQLTester
return result;
}
- public ResultMessage process(String query, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+ public ResultMessage process(String query,
+ QueryState state,
+ QueryOptions options,
+ Map<String, ByteBuffer> customPayload)
+ throws RequestExecutionException, RequestValidationException
{
if (customPayload != null)
requestPayload = customPayload;
@@ -321,7 +330,11 @@ public class MessagePayloadTest extends CQLTester
return result;
}
- public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+ public ResultMessage processBatch(BatchStatement statement,
+ QueryState state,
+ BatchQueryOptions options,
+ Map<String, ByteBuffer> customPayload)
+ throws RequestExecutionException, RequestValidationException
{
if (customPayload != null)
requestPayload = customPayload;
@@ -334,7 +347,11 @@ public class MessagePayloadTest extends CQLTester
return result;
}
- public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+ public ResultMessage processPrepared(CQLStatement statement,
+ QueryState state,
+ QueryOptions options,
+ Map<String, ByteBuffer> customPayload)
+ throws RequestExecutionException, RequestValidationException
{
if (customPayload != null)
requestPayload = customPayload;