You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/09/10 22:06:24 UTC
[5/6] cassandra git commit: Drop support for protocol v1 and v2
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 8992aed..d65e3a6 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -22,23 +22,18 @@ import java.io.IOException;
import java.util.List;
import java.util.Iterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.Lists;
import org.apache.cassandra.cql3.Maps;
import org.apache.cassandra.cql3.Sets;
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -48,10 +43,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
*/
public abstract class CollectionType<T> extends AbstractType<T>
{
- private static final Logger logger = LoggerFactory.getLogger(CollectionType.class);
-
- public static final int MAX_ELEMENTS = 65535;
-
public static CellPath.Serializer cellPathSerializer = new CollectionPathSerializer();
public enum Kind
@@ -148,26 +139,11 @@ public abstract class CollectionType<T> extends AbstractType<T>
return values.size();
}
- protected int enforceLimit(ColumnDefinition def, List<ByteBuffer> values, int version)
- {
- assert isMultiCell();
-
- int size = collectionSize(values);
- if (version >= Server.VERSION_3 || size <= MAX_ELEMENTS)
- return size;
-
- logger.error("Detected collection for table {}.{} with {} elements, more than the {} limit. Only the first {}" +
- " elements will be returned to the client. Please see " +
- "http://cassandra.apache.org/doc/cql3/CQL.html#collections for more details.",
- def.ksName, def.cfName, values.size(), MAX_ELEMENTS, MAX_ELEMENTS);
- return MAX_ELEMENTS;
- }
-
public ByteBuffer serializeForNativeProtocol(ColumnDefinition def, Iterator<Cell> cells, int version)
{
assert isMultiCell();
List<ByteBuffer> values = serializedValues(cells);
- int size = enforceLimit(def, values, version);
+ int size = collectionSize(values);
return CollectionSerializer.pack(values, size, version);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 5fb3e0a..3d6be67 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -50,11 +50,6 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
return deserializeForNativeProtocol(bytes, Server.VERSION_3);
}
- public ByteBuffer reserializeToV3(ByteBuffer bytes)
- {
- return serialize(deserializeForNativeProtocol(bytes, 2));
- }
-
public void validate(ByteBuffer bytes) throws MarshalException
{
// Same thing as above
@@ -76,69 +71,42 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
protected static void writeCollectionSize(ByteBuffer output, int elements, int version)
{
- if (version >= Server.VERSION_3)
output.putInt(elements);
- else
- output.putShort((short)elements);
}
public static int readCollectionSize(ByteBuffer input, int version)
{
- return version >= Server.VERSION_3 ? input.getInt() : ByteBufferUtil.readShortLength(input);
+ return input.getInt();
}
protected static int sizeOfCollectionSize(int elements, int version)
{
- return version >= Server.VERSION_3 ? 4 : 2;
+ return 4;
}
public static void writeValue(ByteBuffer output, ByteBuffer value, int version)
{
- if (version >= Server.VERSION_3)
+ if (value == null)
{
- if (value == null)
- {
- output.putInt(-1);
- return;
- }
-
- output.putInt(value.remaining());
- output.put(value.duplicate());
- }
- else
- {
- assert value != null;
- output.putShort((short)value.remaining());
- output.put(value.duplicate());
+ output.putInt(-1);
+ return;
}
+
+ output.putInt(value.remaining());
+ output.put(value.duplicate());
}
public static ByteBuffer readValue(ByteBuffer input, int version)
{
- if (version >= Server.VERSION_3)
- {
- int size = input.getInt();
- if (size < 0)
- return null;
+ int size = input.getInt();
+ if (size < 0)
+ return null;
- return ByteBufferUtil.readBytes(input, size);
- }
- else
- {
- return ByteBufferUtil.readBytesWithShortLength(input);
- }
+ return ByteBufferUtil.readBytes(input, size);
}
public static int sizeOfValue(ByteBuffer value, int version)
{
- if (version >= Server.VERSION_3)
- {
- return value == null ? 4 : 4 + value.remaining();
- }
- else
- {
- assert value != null;
- return 2 + value.remaining();
- }
+ return value == null ? 4 : 4 + value.remaining();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 9cd1653..14cd812 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -2286,7 +2286,7 @@ public class CassandraServer implements Cassandra.Iface
ThriftClientState cState = state();
return ClientState.getCQLQueryHandler().process(queryString,
cState.getQueryState(),
- QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel),
+ QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel),
Collections.<ByteBuffer>emptyList()),
null).toThriftResult();
}
@@ -2358,7 +2358,7 @@ public class CassandraServer implements Cassandra.Iface
return ClientState.getCQLQueryHandler().processPrepared(prepared.statement,
cState.getQueryState(),
- QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), bindVariables),
+ QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel), bindVariables),
null).toThriftResult();
}
catch (RequestExecutionException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 9b6fdd4..b7f4650 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -21,7 +21,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
-import java.util.UUID;
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
@@ -29,9 +28,9 @@ import io.netty.buffer.ByteBuf;
public abstract class Event
{
public enum Type {
- TOPOLOGY_CHANGE(Server.VERSION_1),
- STATUS_CHANGE(Server.VERSION_1),
- SCHEMA_CHANGE(Server.VERSION_1),
+ TOPOLOGY_CHANGE(Server.VERSION_3),
+ STATUS_CHANGE(Server.VERSION_3),
+ SCHEMA_CHANGE(Server.VERSION_3),
TRACE_COMPLETE(Server.VERSION_4);
public final int minimumVersion;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 14fe589..04cc95e 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -27,7 +27,6 @@ import io.netty.channel.*;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.transport.messages.ErrorMessage;
@@ -50,16 +49,6 @@ public class Frame
* +---------+---------+---------+---------+---------+
* | length |
* +---------+---------+---------+---------+
- *
- *
- * In versions 1 and 2 the header has a smaller (1 byte) stream id, and is thus defined the following way:
- *
- * 0 8 16 24 32
- * +---------+---------+---------+---------+
- * | version | flags | stream | opcode |
- * +---------+---------+---------+---------+
- * | length |
- * +---------+---------+---------+---------+
*/
private Frame(Header header, ByteBuf body)
{
@@ -85,9 +74,8 @@ public class Frame
public static class Header
{
- // 8 bytes in protocol versions 1 and 2, 8 bytes in protocol version 3 and later
- public static final int MODERN_LENGTH = 9;
- public static final int LEGACY_LENGTH = 8;
+ // 9 bytes in protocol version 3 and later
+ public static final int LENGTH = 9;
public static final int BODY_LENGTH_SIZE = 4;
@@ -174,8 +162,8 @@ public class Frame
return;
}
- // Wait until we have read at least the short header
- if (buffer.readableBytes() < Header.LEGACY_LENGTH)
+ // Wait until we have the complete header
+ if (buffer.readableBytes() < Header.LENGTH)
return;
int idx = buffer.readerIndex();
@@ -184,29 +172,14 @@ public class Frame
Message.Direction direction = Message.Direction.extractFromVersion(firstByte);
int version = firstByte & PROTOCOL_VERSION_MASK;
- if (version > Server.CURRENT_VERSION)
- throw new ProtocolException(String.format("Invalid or unsupported protocol version (%d); highest supported is %d ",
- version, Server.CURRENT_VERSION));
-
- // Wait until we have the complete V3+ header
- if (version >= Server.VERSION_3 && buffer.readableBytes() < Header.MODERN_LENGTH)
- return;
+ if (version < Server.MIN_SUPPORTED_VERSION || version > Server.CURRENT_VERSION)
+ throw new ProtocolException(String.format("Invalid or unsupported protocol version (%d); the lowest supported version is %d and the greatest is %d",
+ version, Server.MIN_SUPPORTED_VERSION, Server.CURRENT_VERSION));
int flags = buffer.getByte(idx++);
- int streamId, headerLength;
- if (version >= Server.VERSION_3)
- {
- streamId = buffer.getShort(idx);
- idx += 2;
- headerLength = Header.MODERN_LENGTH;
- }
- else
- {
- streamId = buffer.getByte(idx);
- idx++;
- headerLength = Header.LEGACY_LENGTH;
- }
+ int streamId = buffer.getShort(idx);
+ idx += 2;
// This throws a protocol exceptions if the opcode is unknown
Message.Type type;
@@ -222,13 +195,7 @@ public class Frame
long bodyLength = buffer.getUnsignedInt(idx);
idx += Header.BODY_LENGTH_SIZE;
- if (bodyLength < 0)
- {
- buffer.skipBytes(headerLength);
- throw ErrorMessage.wrap(new ProtocolException("Invalid frame body length: " + bodyLength), streamId);
- }
-
- long frameLength = bodyLength + headerLength;
+ long frameLength = bodyLength + Header.LENGTH;
if (frameLength > MAX_FRAME_LENGTH)
{
// Enter the discard mode and discard everything received so far.
@@ -295,10 +262,7 @@ public class Frame
public void encode(ChannelHandlerContext ctx, Frame frame, List<Object> results)
throws IOException
{
- int headerLength = frame.header.version >= Server.VERSION_3
- ? Header.MODERN_LENGTH
- : Header.LEGACY_LENGTH;
- ByteBuf header = CBUtil.allocator.buffer(headerLength);
+ ByteBuf header = CBUtil.allocator.buffer(Header.LENGTH);
Message.Type type = frame.header.type;
header.writeByte(type.direction.addToVersion(frame.header.version));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index cafc0ce..a9c9bee 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -63,11 +63,10 @@ public class Server implements CassandraDaemon.Server
private static final Logger logger = LoggerFactory.getLogger(Server.class);
private static final boolean useEpoll = NativeTransportService.useEpoll();
- public static final int VERSION_1 = 1;
- public static final int VERSION_2 = 2;
public static final int VERSION_3 = 3;
public static final int VERSION_4 = 4;
public static final int CURRENT_VERSION = VERSION_4;
+ public static final int MIN_SUPPORTED_VERSION = VERSION_3;
private final ConnectionTracker connectionTracker = new ConnectionTracker();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 2db380b..5baf1a6 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -45,9 +45,6 @@ public class BatchMessage extends Message.Request
{
public BatchMessage decode(ByteBuf body, int version)
{
- if (version == 1)
- throw new ProtocolException("BATCH messages are not support in version 1 of the protocol");
-
byte type = body.readByte();
int n = body.readUnsignedShort();
List<Object> queryOrIds = new ArrayList<>(n);
@@ -63,9 +60,7 @@ public class BatchMessage extends Message.Request
throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind);
variables.add(CBUtil.readValueList(body, version));
}
- QueryOptions options = version < 3
- ? QueryOptions.fromPreV3Batch(CBUtil.readConsistencyLevel(body))
- : QueryOptions.codec.decode(body, version);
+ QueryOptions options = QueryOptions.codec.decode(body, version);
return new BatchMessage(toType(type), queryOrIds, variables, options);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 718595c..940a0fc 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.transport.messages;
-import java.nio.ByteBuffer;
-import java.util.List;
import java.util.UUID;
import com.google.common.collect.ImmutableMap;
@@ -28,7 +26,6 @@ import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.ParsedStatement;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
@@ -45,16 +42,7 @@ public class ExecuteMessage extends Message.Request
public ExecuteMessage decode(ByteBuf body, int version)
{
byte[] id = CBUtil.readBytes(body);
- if (version == 1)
- {
- List<ByteBuffer> values = CBUtil.readValueList(body, version);
- ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
- return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.fromProtocolV1(consistency, values));
- }
- else
- {
- return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.codec.decode(body, version));
- }
+ return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.codec.decode(body, version));
}
public void encode(ExecuteMessage msg, ByteBuf dest, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 67f3734..3b48d52 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -17,20 +17,20 @@
*/
package org.apache.cassandra.transport.messages;
-import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.UUID;
import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBuf;
import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.UUIDGen;
@@ -44,15 +44,7 @@ public class QueryMessage extends Message.Request
public QueryMessage decode(ByteBuf body, int version)
{
String query = CBUtil.readLongString(body);
- if (version == 1)
- {
- ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
- return new QueryMessage(query, QueryOptions.fromProtocolV1(consistency, Collections.<ByteBuffer>emptyList()));
- }
- else
- {
- return new QueryMessage(query, QueryOptions.codec.decode(body, version));
- }
+ return new QueryMessage(query, QueryOptions.codec.decode(body, version));
}
public void encode(QueryMessage msg, ByteBuf dest, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 3d3729a..25dfc28 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -32,12 +32,16 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.datastax.driver.core.*;
import com.datastax.driver.core.ResultSet;
+
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+
import org.junit.*;
+
+import com.datastax.driver.core.Cluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
@@ -52,7 +56,6 @@ import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.marshal.TupleType;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.service.ClientState;
@@ -62,7 +65,6 @@ import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.Server;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
-
import static junit.framework.Assert.assertNotNull;
/**
@@ -81,30 +83,29 @@ public abstract class CQLTester
private static org.apache.cassandra.transport.Server server;
protected static final int nativePort;
protected static final InetAddress nativeAddr;
- private static final Cluster[] cluster;
- private static final Session[] session;
+ private static final Map<Integer, Cluster> clusters = new HashMap<>();
+ private static final Map<Integer, Session> sessions = new HashMap<>();
private static boolean isServerPrepared = false;
- public static int maxProtocolVersion;
+ public static final List<Integer> PROTOCOL_VERSIONS;
static
{
- int version;
- for (version = 1; version <= Server.CURRENT_VERSION; )
+ // The latest versions might not be supported yet by the java driver
+ ImmutableList.Builder<Integer> builder = ImmutableList.builder();
+ for (int version = Server.MIN_SUPPORTED_VERSION; version <= Server.CURRENT_VERSION; version++)
{
try
{
- ProtocolVersion.fromInt(++version);
+ ProtocolVersion.fromInt(version);
+ builder.add(version);
}
catch (IllegalArgumentException e)
{
- version--;
break;
}
}
- maxProtocolVersion = version;
- cluster = new Cluster[maxProtocolVersion];
- session = new Session[maxProtocolVersion];
+ PROTOCOL_VERSIONS = builder.build();
// Once per-JVM is enough
prepareServer(true);
@@ -227,11 +228,9 @@ public abstract class CQLTester
@AfterClass
public static void tearDownClass()
{
- for (Session sess : session)
- if (sess != null)
+ for (Session sess : sessions.values())
sess.close();
- for (Cluster cl : cluster)
- if (cl != null)
+ for (Cluster cl : clusters.values())
cl.close();
if (server != null)
@@ -319,17 +318,19 @@ public abstract class CQLTester
server = new Server.Builder().withHost(nativeAddr).withPort(nativePort).build();
server.start();
- for (int version = 1; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
{
- if (cluster[version-1] != null)
+ if (clusters.containsKey(version))
continue;
- cluster[version-1] = Cluster.builder().addContactPoints(nativeAddr)
- .withClusterName("Test Cluster")
- .withPort(nativePort)
- .withProtocolVersion(ProtocolVersion.fromInt(version))
- .build();
- session[version-1] = cluster[version-1].connect();
+ Cluster cluster = Cluster.builder()
+ .addContactPoints(nativeAddr)
+ .withClusterName("Test Cluster")
+ .withPort(nativePort)
+ .withProtocolVersion(ProtocolVersion.fromInt(version))
+ .build();
+ clusters.put(version, cluster);
+ sessions.put(version, cluster.connect());
logger.info("Started Java Driver instance for protocol version {}", version);
}
@@ -623,16 +624,19 @@ public abstract class CQLTester
protected com.datastax.driver.core.ResultSet executeNet(int protocolVersion, String query, Object... values) throws Throwable
{
- requireNetwork();
+ return sessionNet(protocolVersion).execute(formatQuery(query), values);
+ }
- return session[protocolVersion-1].execute(formatQuery(query), values);
+ protected Session sessionNet()
+ {
+ return sessionNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1));
}
protected Session sessionNet(int protocolVersion)
{
requireNetwork();
- return session[protocolVersion-1];
+ return sessions.get(protocolVersion);
}
private String formatQuery(String query)
@@ -696,9 +700,9 @@ public abstract class CQLTester
for (int j = 0; j < meta.size(); j++)
{
DataType type = meta.getType(j);
- com.datastax.driver.core.TypeCodec<Object> codec = cluster[protocolVersion -1].getConfiguration()
- .getCodecRegistry()
- .codecFor(type);
+ com.datastax.driver.core.TypeCodec<Object> codec = clusters.get(protocolVersion).getConfiguration()
+ .getCodecRegistry()
+ .codecFor(type);
ByteBuffer expectedByteValue = codec.serialize(expected[j], ProtocolVersion.fromInt(protocolVersion));
int expectedBytes = expectedByteValue.remaining();
ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j));
@@ -1237,7 +1241,7 @@ public abstract class CQLTester
protected com.datastax.driver.core.TupleType tupleTypeOf(int protocolVersion, DataType...types)
{
requireNetwork();
- return cluster[protocolVersion -1].getMetadata().newTupleType(types);
+ return clusters.get(protocolVersion).getMetadata().newTupleType(types);
}
// Attempt to find an AbstracType from a value (for serialization/printing sake).
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
index 0d590b2..a947593 100644
--- a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
@@ -3,7 +3,6 @@ package org.apache.cassandra.cql3;
import org.junit.Test;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import static org.junit.Assert.assertEquals;
@@ -77,7 +76,7 @@ public class IndexQueryPagingTest extends CQLTester
// setting the fetch size < than the row count. Assert
// that all rows are returned, so we know that paging
// of the results was involved.
- Session session = sessionNet(maxProtocolVersion);
+ Session session = sessionNet();
Statement stmt = session.newSimpleStatement(String.format(cql, KEYSPACE + "." + currentTable()));
stmt.setFetchSize(rowCount - 1);
assertEquals(rowCount, session.execute(stmt).all().size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
index f43e335..ef06dbf 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
@@ -89,7 +89,7 @@ public class UFPureScriptTest extends CQLTester
assertRows(execute("SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
row(list, set, map));
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
assertRowsNet(version,
executeNet(version, "SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
row(list, set, map));
@@ -193,7 +193,7 @@ public class UFPureScriptTest extends CQLTester
DataType.map(DataType.cint(),
DataType.cboolean()));
TupleValue tup = tType.newValue(1d, list, set, map);
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
{
assertRowsNet(version,
executeNet(version, "SELECT " + fTup1 + "(tup) FROM %s WHERE key = 1"),
@@ -303,7 +303,7 @@ public class UFPureScriptTest extends CQLTester
row("three", "one", "two"));
// same test - but via native protocol
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
assertRowsNet(version,
executeNet(version, cqlSelect),
row("three", "one", "two"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index 6bd03ad..ce50767 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -29,7 +29,6 @@ import java.util.TreeSet;
import java.util.UUID;
import java.security.AccessControlException;
-import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
@@ -932,7 +931,7 @@ public class UFTest extends CQLTester
row(list, set, map));
// same test - but via native protocol
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
assertRowsNet(version,
executeNet(version, "SELECT " + fList + "(lst), " + fSet + "(st), " + fMap + "(mp) FROM %s WHERE key = 1"),
row(list, set, map));
@@ -1041,7 +1040,7 @@ public class UFTest extends CQLTester
Assert.assertNull(row.getBytes("t"));
Assert.assertNull(row.getBytes("u"));
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
{
Row r = executeNet(version, "SELECT " +
fList + "(lst) as l, " +
@@ -1168,7 +1167,7 @@ public class UFTest extends CQLTester
DataType.set(DataType.text()),
DataType.map(DataType.cint(), DataType.cboolean()));
TupleValue tup = tType.newValue(1d, list, set, map);
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
{
assertRowsNet(version,
executeNet(version, "SELECT " + fTup0 + "(tup) FROM %s WHERE key = 1"),
@@ -1195,7 +1194,7 @@ public class UFTest extends CQLTester
createTable("CREATE TABLE %s (key int primary key, udt frozen<" + KEYSPACE + '.' + type + ">)");
execute("INSERT INTO %s (key, udt) VALUES (1, {txt: 'one', i:1})");
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
{
executeNet(version, "USE " + KEYSPACE);
@@ -1259,7 +1258,7 @@ public class UFTest extends CQLTester
assertRows(execute("SELECT " + fUdt2 + "(udt) FROM %s WHERE key = 1"),
row(1));
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
{
List<Row> rowsNet = executeNet(version, "SELECT " + fUdt0 + "(udt) FROM %s WHERE key = 1").all();
Assert.assertEquals(1, rowsNet.size());
@@ -1521,7 +1520,7 @@ public class UFTest extends CQLTester
assertRows(execute("SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
row("three", "one", "two"));
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
assertRowsNet(version,
executeNet(version, "SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
row("three", "one", "two"));
@@ -1777,7 +1776,7 @@ public class UFTest extends CQLTester
"LANGUAGE JAVA\n" +
"AS 'throw new RuntimeException();'");
- for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+ for (int version : PROTOCOL_VERSIONS)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
index 7716b4c..5cdeb78 100644
--- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@ -18,20 +18,19 @@
package org.apache.cassandra.service;
import org.apache.commons.lang3.StringUtils;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.Server;
import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.transport.messages.QueryMessage;
import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNull;
public class ClientWarningsTest extends CQLTester
{
@@ -77,21 +76,6 @@ public class ClientWarningsTest extends CQLTester
}
}
- @Test
- public void testLargeBatchWithProtoV2() throws Exception
- {
- createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
- try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_2))
- {
- client.connect(false);
-
- QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
- Message.Response resp = client.execute(query);
- assertNull(resp.getWarnings());
- }
- }
-
private String createBatchStatement(int minSize)
{
return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) VALUES (1, '%s') APPLY BATCH;",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
index 9910167..f47f355 100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
@@ -20,25 +20,36 @@ package org.apache.cassandra.transport;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.cassandra.transport.messages.ErrorMessage;
+
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.cassandra.transport.Message.Direction.*;
+
public class ProtocolErrorTest {
@Test
public void testInvalidProtocolVersion() throws Exception
{
+ // test using a protocol version higher than the current version
+ testInvalidProtocolVersion(Server.CURRENT_VERSION + 1);
+ // test using a protocol version lower than the lowest version
+ testInvalidProtocolVersion(Server.MIN_SUPPORTED_VERSION - 1);
+
+ }
+
+ public void testInvalidProtocolVersion(int version) throws Exception
+ {
Frame.Decoder dec = new Frame.Decoder(null);
List<Object> results = new ArrayList<>();
- // should generate a protocol exception for using a protocol version higher than the current version
byte[] frame = new byte[] {
- (byte) ((Server.CURRENT_VERSION + 1) & Frame.PROTOCOL_VERSION_MASK), // direction & version
+ (byte) REQUEST.addToVersion(version), // direction & version
0x00, // flags
- 0x01, // stream ID
+ 0x00, 0x01, // stream ID
0x09, // opcode
0x00, 0x00, 0x00, 0x21, // body length
0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45,
@@ -52,6 +63,7 @@ public class ProtocolErrorTest {
dec.decode(null, buf, results);
Assert.fail("Expected protocol error");
} catch (ProtocolException e) {
+ Assert.assertTrue(e.getMessage().contains("Invalid or unsupported protocol version"));
}
}
@@ -64,9 +76,9 @@ public class ProtocolErrorTest {
// should generate a protocol exception for using a response frame with
// a prepare op, ensure that it comes back with stream ID 1
byte[] frame = new byte[] {
- (byte) 0x82, // direction & version
+ (byte) RESPONSE.addToVersion(Server.CURRENT_VERSION), // direction & version
0x00, // flags
- 0x01, // stream ID
+ 0x00, 0x01, // stream ID
0x09, // opcode
0x00, 0x00, 0x00, 0x21, // body length
0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45,
@@ -82,29 +94,7 @@ public class ProtocolErrorTest {
} catch (ErrorMessage.WrappedException e) {
// make sure the exception has the correct stream ID
Assert.assertEquals(1, e.getStreamId());
- }
- }
-
- @Test
- public void testNegativeBodyLength() throws Exception
- {
- Frame.Decoder dec = new Frame.Decoder(null);
-
- List<Object> results = new ArrayList<>();
- byte[] frame = new byte[] {
- (byte) 0x82, // direction & version
- 0x00, // flags
- 0x01, // stream ID
- 0x09, // opcode
- (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, // body length (-1)
- };
- ByteBuf buf = Unpooled.wrappedBuffer(frame);
- try {
- dec.decode(null, buf, results);
- Assert.fail("Expected protocol error");
- } catch (ErrorMessage.WrappedException e) {
- // make sure the exception has the correct stream ID
- Assert.assertEquals(1, e.getStreamId());
+ Assert.assertTrue(e.getMessage().contains("Wrong protocol direction"));
}
}
@@ -115,19 +105,21 @@ public class ProtocolErrorTest {
List<Object> results = new ArrayList<>();
byte[] frame = new byte[] {
- (byte) 0x82, // direction & version
+ (byte) REQUEST.addToVersion(Server.CURRENT_VERSION), // direction & version
0x00, // flags
- 0x01, // stream ID
+ 0x00, 0x01, // stream ID
0x09, // opcode
- 0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff, // body length
+ 0x10, (byte) 0x00, (byte) 0x00, (byte) 0x00, // body length
};
- ByteBuf buf = Unpooled.wrappedBuffer(frame);
+ byte[] body = new byte[0x10000000];
+ ByteBuf buf = Unpooled.wrappedBuffer(frame, body);
try {
dec.decode(null, buf, results);
Assert.fail("Expected protocol error");
} catch (ErrorMessage.WrappedException e) {
// make sure the exception has the correct stream ID
Assert.assertEquals(1, e.getStreamId());
+ Assert.assertTrue(e.getMessage().contains("Request is too big"));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index 352327e..fdb346e 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -25,7 +25,6 @@ import io.netty.buffer.ByteBuf;
import org.junit.Test;
import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.transport.Event.TopologyChange;
@@ -46,8 +45,8 @@ public class SerDeserTest
@Test
public void collectionSerDeserTest() throws Exception
{
- collectionSerDeserTest(2);
collectionSerDeserTest(3);
+ collectionSerDeserTest(4);
}
public void collectionSerDeserTest(int version) throws Exception
@@ -93,7 +92,6 @@ public class SerDeserTest
@Test
public void eventSerDeserTest() throws Exception
{
- eventSerDeserTest(2);
eventSerDeserTest(3);
eventSerDeserTest(4);
}
@@ -173,8 +171,8 @@ public class SerDeserTest
@Test
public void udtSerDeserTest() throws Exception
{
- udtSerDeserTest(2);
udtSerDeserTest(3);
+ udtSerDeserTest(4);
}
public void udtSerDeserTest(int version) throws Exception
@@ -200,10 +198,6 @@ public class SerDeserTest
Term t = u.prepare("ks", columnSpec("myValue", udt));
QueryOptions options = QueryOptions.DEFAULT;
- if (version == 2)
- options = QueryOptions.fromProtocolV2(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList());
- else if (version != 3)
- throw new AssertionError("Invalid protocol version for test");
ByteBuffer serialized = t.bindAndGet(options);