You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/09/10 22:09:55 UTC

[5/7] cassandra git commit: Drop support for protocol v1 and v2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 8992aed..d65e3a6 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -22,23 +22,18 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Iterator;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.Lists;
 import org.apache.cassandra.cql3.Maps;
 import org.apache.cassandra.cql3.Sets;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -48,10 +43,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public abstract class CollectionType<T> extends AbstractType<T>
 {
-    private static final Logger logger = LoggerFactory.getLogger(CollectionType.class);
-
-    public static final int MAX_ELEMENTS = 65535;
-
     public static CellPath.Serializer cellPathSerializer = new CollectionPathSerializer();
 
     public enum Kind
@@ -148,26 +139,11 @@ public abstract class CollectionType<T> extends AbstractType<T>
         return values.size();
     }
 
-    protected int enforceLimit(ColumnDefinition def, List<ByteBuffer> values, int version)
-    {
-        assert isMultiCell();
-
-        int size = collectionSize(values);
-        if (version >= Server.VERSION_3 || size <= MAX_ELEMENTS)
-            return size;
-
-        logger.error("Detected collection for table {}.{} with {} elements, more than the {} limit. Only the first {}" +
-                     " elements will be returned to the client. Please see " +
-                     "http://cassandra.apache.org/doc/cql3/CQL.html#collections for more details.",
-                     def.ksName, def.cfName, values.size(), MAX_ELEMENTS, MAX_ELEMENTS);
-        return MAX_ELEMENTS;
-    }
-
     public ByteBuffer serializeForNativeProtocol(ColumnDefinition def, Iterator<Cell> cells, int version)
     {
         assert isMultiCell();
         List<ByteBuffer> values = serializedValues(cells);
-        int size = enforceLimit(def, values, version);
+        int size = collectionSize(values);
         return CollectionSerializer.pack(values, size, version);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 5fb3e0a..3d6be67 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -50,11 +50,6 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
         return deserializeForNativeProtocol(bytes, Server.VERSION_3);
     }
 
-    public ByteBuffer reserializeToV3(ByteBuffer bytes)
-    {
-        return serialize(deserializeForNativeProtocol(bytes, 2));
-    }
-
     public void validate(ByteBuffer bytes) throws MarshalException
     {
         // Same thing as above
@@ -76,69 +71,42 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
 
     protected static void writeCollectionSize(ByteBuffer output, int elements, int version)
     {
-        if (version >= Server.VERSION_3)
             output.putInt(elements);
-        else
-            output.putShort((short)elements);
     }
 
     public static int readCollectionSize(ByteBuffer input, int version)
     {
-        return version >= Server.VERSION_3 ? input.getInt() : ByteBufferUtil.readShortLength(input);
+        return input.getInt();
     }
 
     protected static int sizeOfCollectionSize(int elements, int version)
     {
-        return version >= Server.VERSION_3 ? 4 : 2;
+        return 4;
     }
 
     public static void writeValue(ByteBuffer output, ByteBuffer value, int version)
     {
-        if (version >= Server.VERSION_3)
+        if (value == null)
         {
-            if (value == null)
-            {
-                output.putInt(-1);
-                return;
-            }
-
-            output.putInt(value.remaining());
-            output.put(value.duplicate());
-        }
-        else
-        {
-            assert value != null;
-            output.putShort((short)value.remaining());
-            output.put(value.duplicate());
+            output.putInt(-1);
+            return;
         }
+
+        output.putInt(value.remaining());
+        output.put(value.duplicate());
     }
 
     public static ByteBuffer readValue(ByteBuffer input, int version)
     {
-        if (version >= Server.VERSION_3)
-        {
-            int size = input.getInt();
-            if (size < 0)
-                return null;
+        int size = input.getInt();
+        if (size < 0)
+            return null;
 
-            return ByteBufferUtil.readBytes(input, size);
-        }
-        else
-        {
-            return ByteBufferUtil.readBytesWithShortLength(input);
-        }
+        return ByteBufferUtil.readBytes(input, size);
     }
 
     public static int sizeOfValue(ByteBuffer value, int version)
     {
-        if (version >= Server.VERSION_3)
-        {
-            return value == null ? 4 : 4 + value.remaining();
-        }
-        else
-        {
-            assert value != null;
-            return 2 + value.remaining();
-        }
+        return value == null ? 4 : 4 + value.remaining();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 9cd1653..14cd812 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -2286,7 +2286,7 @@ public class CassandraServer implements Cassandra.Iface
             ThriftClientState cState = state();
             return ClientState.getCQLQueryHandler().process(queryString,
                                                             cState.getQueryState(),
-                                                            QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel),
+                                                            QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel),
                                                             Collections.<ByteBuffer>emptyList()),
                                                             null).toThriftResult();
         }
@@ -2358,7 +2358,7 @@ public class CassandraServer implements Cassandra.Iface
 
             return ClientState.getCQLQueryHandler().processPrepared(prepared.statement,
                                                                     cState.getQueryState(),
-                                                                    QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), bindVariables),
+                                                                    QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel), bindVariables),
                                                                     null).toThriftResult();
         }
         catch (RequestExecutionException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 9b6fdd4..b7f4650 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -21,7 +21,6 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
 
 import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
@@ -29,9 +28,9 @@ import io.netty.buffer.ByteBuf;
 public abstract class Event
 {
     public enum Type {
-        TOPOLOGY_CHANGE(Server.VERSION_1),
-        STATUS_CHANGE(Server.VERSION_1),
-        SCHEMA_CHANGE(Server.VERSION_1),
+        TOPOLOGY_CHANGE(Server.VERSION_3),
+        STATUS_CHANGE(Server.VERSION_3),
+        SCHEMA_CHANGE(Server.VERSION_3),
         TRACE_COMPLETE(Server.VERSION_4);
 
         public final int minimumVersion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 14fe589..04cc95e 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -27,7 +27,6 @@ import io.netty.channel.*;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.handler.codec.MessageToMessageEncoder;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.transport.messages.ErrorMessage;
@@ -50,16 +49,6 @@ public class Frame
      *   +---------+---------+---------+---------+---------+
      *   |                length                 |
      *   +---------+---------+---------+---------+
-     *
-     *
-     * In versions 1 and 2 the header has a smaller (1 byte) stream id, and is thus defined the following way:
-     *
-     *   0         8        16        24        32
-     *   +---------+---------+---------+---------+
-     *   | version |  flags  | stream  | opcode  |
-     *   +---------+---------+---------+---------+
-     *   |                length                 |
-     *   +---------+---------+---------+---------+
      */
     private Frame(Header header, ByteBuf body)
     {
@@ -85,9 +74,8 @@ public class Frame
 
     public static class Header
     {
-        // 8 bytes in protocol versions 1 and 2, 8 bytes in protocol version 3 and later
-        public static final int MODERN_LENGTH = 9;
-        public static final int LEGACY_LENGTH = 8;
+        // 9 bytes in protocol version 3 and later
+        public static final int LENGTH = 9;
 
         public static final int BODY_LENGTH_SIZE = 4;
 
@@ -174,8 +162,8 @@ public class Frame
                 return;
             }
 
-            // Wait until we have read at least the short header
-            if (buffer.readableBytes() < Header.LEGACY_LENGTH)
+            // Wait until we have the complete header
+            if (buffer.readableBytes() < Header.LENGTH)
                 return;
 
             int idx = buffer.readerIndex();
@@ -184,29 +172,14 @@ public class Frame
             Message.Direction direction = Message.Direction.extractFromVersion(firstByte);
             int version = firstByte & PROTOCOL_VERSION_MASK;
 
-            if (version > Server.CURRENT_VERSION)
-                throw new ProtocolException(String.format("Invalid or unsupported protocol version (%d); highest supported is %d ",
-                                                          version, Server.CURRENT_VERSION));
-
-            // Wait until we have the complete V3+ header
-            if (version >= Server.VERSION_3 && buffer.readableBytes() < Header.MODERN_LENGTH)
-                return;
+            if (version < Server.MIN_SUPPORTED_VERSION || version > Server.CURRENT_VERSION)
+                throw new ProtocolException(String.format("Invalid or unsupported protocol version (%d); the lowest supported version is %d and the greatest is %d",
+                                                          version, Server.MIN_SUPPORTED_VERSION, Server.CURRENT_VERSION));
 
             int flags = buffer.getByte(idx++);
 
-            int streamId, headerLength;
-            if (version >= Server.VERSION_3)
-            {
-                streamId = buffer.getShort(idx);
-                idx += 2;
-                headerLength = Header.MODERN_LENGTH;
-            }
-            else
-            {
-                streamId = buffer.getByte(idx);
-                idx++;
-                headerLength = Header.LEGACY_LENGTH;
-            }
+            int streamId = buffer.getShort(idx);
+            idx += 2;
 
             // This throws a protocol exceptions if the opcode is unknown
             Message.Type type;
@@ -222,13 +195,7 @@ public class Frame
             long bodyLength = buffer.getUnsignedInt(idx);
             idx += Header.BODY_LENGTH_SIZE;
 
-            if (bodyLength < 0)
-            {
-                buffer.skipBytes(headerLength);
-                throw ErrorMessage.wrap(new ProtocolException("Invalid frame body length: " + bodyLength), streamId);
-            }
-
-            long frameLength = bodyLength + headerLength;
+            long frameLength = bodyLength + Header.LENGTH;
             if (frameLength > MAX_FRAME_LENGTH)
             {
                 // Enter the discard mode and discard everything received so far.
@@ -295,10 +262,7 @@ public class Frame
         public void encode(ChannelHandlerContext ctx, Frame frame, List<Object> results)
         throws IOException
         {
-            int headerLength = frame.header.version >= Server.VERSION_3
-                             ? Header.MODERN_LENGTH
-                             : Header.LEGACY_LENGTH;
-            ByteBuf header = CBUtil.allocator.buffer(headerLength);
+            ByteBuf header = CBUtil.allocator.buffer(Header.LENGTH);
 
             Message.Type type = frame.header.type;
             header.writeByte(type.direction.addToVersion(frame.header.version));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index cafc0ce..a9c9bee 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -63,11 +63,10 @@ public class Server implements CassandraDaemon.Server
     private static final Logger logger = LoggerFactory.getLogger(Server.class);
     private static final boolean useEpoll = NativeTransportService.useEpoll();
 
-    public static final int VERSION_1 = 1;
-    public static final int VERSION_2 = 2;
     public static final int VERSION_3 = 3;
     public static final int VERSION_4 = 4;
     public static final int CURRENT_VERSION = VERSION_4;
+    public static final int MIN_SUPPORTED_VERSION = VERSION_3;
 
     private final ConnectionTracker connectionTracker = new ConnectionTracker();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 2db380b..5baf1a6 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -45,9 +45,6 @@ public class BatchMessage extends Message.Request
     {
         public BatchMessage decode(ByteBuf body, int version)
         {
-            if (version == 1)
-                throw new ProtocolException("BATCH messages are not support in version 1 of the protocol");
-
             byte type = body.readByte();
             int n = body.readUnsignedShort();
             List<Object> queryOrIds = new ArrayList<>(n);
@@ -63,9 +60,7 @@ public class BatchMessage extends Message.Request
                     throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind);
                 variables.add(CBUtil.readValueList(body, version));
             }
-            QueryOptions options = version < 3
-                                 ? QueryOptions.fromPreV3Batch(CBUtil.readConsistencyLevel(body))
-                                 : QueryOptions.codec.decode(body, version);
+            QueryOptions options = QueryOptions.codec.decode(body, version);
 
             return new BatchMessage(toType(type), queryOrIds, variables, options);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 718595c..940a0fc 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.transport.messages;
 
-import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
@@ -28,7 +26,6 @@ import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -45,16 +42,7 @@ public class ExecuteMessage extends Message.Request
         public ExecuteMessage decode(ByteBuf body, int version)
         {
             byte[] id = CBUtil.readBytes(body);
-            if (version == 1)
-            {
-                List<ByteBuffer> values = CBUtil.readValueList(body, version);
-                ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
-                return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.fromProtocolV1(consistency, values));
-            }
-            else
-            {
-                return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.codec.decode(body, version));
-            }
+            return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.codec.decode(body, version));
         }
 
         public void encode(ExecuteMessage msg, ByteBuf dest, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 67f3734..3b48d52 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -17,20 +17,20 @@
  */
 package org.apache.cassandra.transport.messages;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -44,15 +44,7 @@ public class QueryMessage extends Message.Request
         public QueryMessage decode(ByteBuf body, int version)
         {
             String query = CBUtil.readLongString(body);
-            if (version == 1)
-            {
-                ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
-                return new QueryMessage(query, QueryOptions.fromProtocolV1(consistency, Collections.<ByteBuffer>emptyList()));
-            }
-            else
-            {
-                return new QueryMessage(query, QueryOptions.codec.decode(body, version));
-            }
+            return new QueryMessage(query, QueryOptions.codec.decode(body, version));
         }
 
         public void encode(QueryMessage msg, ByteBuf dest, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 3d3729a..25dfc28 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -32,12 +32,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.ResultSet;
+
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+
 import org.junit.*;
+
+import com.datastax.driver.core.Cluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
@@ -52,7 +56,6 @@ import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.marshal.TupleType;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.service.ClientState;
@@ -62,7 +65,6 @@ import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
-
 import static junit.framework.Assert.assertNotNull;
 
 /**
@@ -81,30 +83,29 @@ public abstract class CQLTester
     private static org.apache.cassandra.transport.Server server;
     protected static final int nativePort;
     protected static final InetAddress nativeAddr;
-    private static final Cluster[] cluster;
-    private static final Session[] session;
+    private static final Map<Integer, Cluster> clusters = new HashMap<>();
+    private static final Map<Integer, Session> sessions = new HashMap<>();
 
     private static boolean isServerPrepared = false;
 
-    public static int maxProtocolVersion;
+    public static final List<Integer> PROTOCOL_VERSIONS;
     static
     {
-        int version;
-        for (version = 1; version <= Server.CURRENT_VERSION; )
+        // The latest versions might not be supported yet by the java driver
+        ImmutableList.Builder<Integer> builder = ImmutableList.builder();
+        for (int version = Server.MIN_SUPPORTED_VERSION; version <= Server.CURRENT_VERSION; version++)
         {
             try
             {
-                ProtocolVersion.fromInt(++version);
+                ProtocolVersion.fromInt(version);
+                builder.add(version);
             }
             catch (IllegalArgumentException e)
             {
-                version--;
                 break;
             }
         }
-        maxProtocolVersion = version;
-        cluster = new Cluster[maxProtocolVersion];
-        session = new Session[maxProtocolVersion];
+        PROTOCOL_VERSIONS = builder.build();
 
         // Once per-JVM is enough
         prepareServer(true);
@@ -227,11 +228,9 @@ public abstract class CQLTester
     @AfterClass
     public static void tearDownClass()
     {
-        for (Session sess : session)
-            if (sess != null)
+        for (Session sess : sessions.values())
                 sess.close();
-        for (Cluster cl : cluster)
-            if (cl != null)
+        for (Cluster cl : clusters.values())
                 cl.close();
 
         if (server != null)
@@ -319,17 +318,19 @@ public abstract class CQLTester
         server = new Server.Builder().withHost(nativeAddr).withPort(nativePort).build();
         server.start();
 
-        for (int version = 1; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
         {
-            if (cluster[version-1] != null)
+            if (clusters.containsKey(version))
                 continue;
 
-            cluster[version-1] = Cluster.builder().addContactPoints(nativeAddr)
-                                  .withClusterName("Test Cluster")
-                                  .withPort(nativePort)
-                                  .withProtocolVersion(ProtocolVersion.fromInt(version))
-                                  .build();
-            session[version-1] = cluster[version-1].connect();
+            Cluster cluster = Cluster.builder()
+                                     .addContactPoints(nativeAddr)
+                                     .withClusterName("Test Cluster")
+                                     .withPort(nativePort)
+                                     .withProtocolVersion(ProtocolVersion.fromInt(version))
+                                     .build();
+            clusters.put(version, cluster);
+            sessions.put(version, cluster.connect());
 
             logger.info("Started Java Driver instance for protocol version {}", version);
         }
@@ -623,16 +624,19 @@ public abstract class CQLTester
 
     protected com.datastax.driver.core.ResultSet executeNet(int protocolVersion, String query, Object... values) throws Throwable
     {
-        requireNetwork();
+        return sessionNet(protocolVersion).execute(formatQuery(query), values);
+    }
 
-        return session[protocolVersion-1].execute(formatQuery(query), values);
+    protected Session sessionNet()
+    {
+        return sessionNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1));
     }
 
     protected Session sessionNet(int protocolVersion)
     {
         requireNetwork();
 
-        return session[protocolVersion-1];
+        return sessions.get(protocolVersion);
     }
 
     private String formatQuery(String query)
@@ -696,9 +700,9 @@ public abstract class CQLTester
             for (int j = 0; j < meta.size(); j++)
             {
                 DataType type = meta.getType(j);
-                com.datastax.driver.core.TypeCodec<Object> codec = cluster[protocolVersion -1].getConfiguration()
-                                                                                              .getCodecRegistry()
-                                                                                              .codecFor(type);
+                com.datastax.driver.core.TypeCodec<Object> codec = clusters.get(protocolVersion).getConfiguration()
+                                                                                                .getCodecRegistry()
+                                                                                                .codecFor(type);
                 ByteBuffer expectedByteValue = codec.serialize(expected[j], ProtocolVersion.fromInt(protocolVersion));
                 int expectedBytes = expectedByteValue.remaining();
                 ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j));
@@ -1237,7 +1241,7 @@ public abstract class CQLTester
     protected com.datastax.driver.core.TupleType tupleTypeOf(int protocolVersion, DataType...types)
     {
         requireNetwork();
-        return cluster[protocolVersion -1].getMetadata().newTupleType(types);
+        return clusters.get(protocolVersion).getMetadata().newTupleType(types);
     }
 
     // Attempt to find an AbstracType from a value (for serialization/printing sake).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
index 0d590b2..a947593 100644
--- a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
@@ -3,7 +3,6 @@ package org.apache.cassandra.cql3;
 import org.junit.Test;
 
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.Statement;
 
 import static org.junit.Assert.assertEquals;
@@ -77,7 +76,7 @@ public class IndexQueryPagingTest extends CQLTester
         // setting the fetch size < than the row count. Assert
         // that all rows are returned, so we know that paging
         // of the results was involved.
-        Session session = sessionNet(maxProtocolVersion);
+        Session session = sessionNet();
         Statement stmt = session.newSimpleStatement(String.format(cql, KEYSPACE + "." + currentTable()));
         stmt.setFetchSize(rowCount - 1);
         assertEquals(rowCount, session.execute(stmt).all().size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
index f43e335..ef06dbf 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
@@ -89,7 +89,7 @@ public class UFPureScriptTest extends CQLTester
         assertRows(execute("SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
                    row(list, set, map));
 
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
             assertRowsNet(version,
                           executeNet(version, "SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
                           row(list, set, map));
@@ -193,7 +193,7 @@ public class UFPureScriptTest extends CQLTester
                                       DataType.map(DataType.cint(),
                                                    DataType.cboolean()));
         TupleValue tup = tType.newValue(1d, list, set, map);
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
         {
             assertRowsNet(version,
                           executeNet(version, "SELECT " + fTup1 + "(tup) FROM %s WHERE key = 1"),
@@ -303,7 +303,7 @@ public class UFPureScriptTest extends CQLTester
                    row("three", "one", "two"));
 
         // same test - but via native protocol
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
             assertRowsNet(version,
                           executeNet(version, cqlSelect),
                           row("three", "one", "two"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index 6bd03ad..ce50767 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -29,7 +29,6 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.security.AccessControlException;
 
-import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -932,7 +931,7 @@ public class UFTest extends CQLTester
                    row(list, set, map));
 
         // same test - but via native protocol
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
             assertRowsNet(version,
                           executeNet(version, "SELECT " + fList + "(lst), " + fSet + "(st), " + fMap + "(mp) FROM %s WHERE key = 1"),
                           row(list, set, map));
@@ -1041,7 +1040,7 @@ public class UFTest extends CQLTester
         Assert.assertNull(row.getBytes("t"));
         Assert.assertNull(row.getBytes("u"));
 
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
         {
             Row r = executeNet(version, "SELECT " +
                                         fList + "(lst) as l, " +
@@ -1168,7 +1167,7 @@ public class UFTest extends CQLTester
                                       DataType.set(DataType.text()),
                                       DataType.map(DataType.cint(), DataType.cboolean()));
         TupleValue tup = tType.newValue(1d, list, set, map);
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
         {
             assertRowsNet(version,
                           executeNet(version, "SELECT " + fTup0 + "(tup) FROM %s WHERE key = 1"),
@@ -1195,7 +1194,7 @@ public class UFTest extends CQLTester
         createTable("CREATE TABLE %s (key int primary key, udt frozen<" + KEYSPACE + '.' + type + ">)");
         execute("INSERT INTO %s (key, udt) VALUES (1, {txt: 'one', i:1})");
 
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
         {
             executeNet(version, "USE " + KEYSPACE);
 
@@ -1259,7 +1258,7 @@ public class UFTest extends CQLTester
         assertRows(execute("SELECT " + fUdt2 + "(udt) FROM %s WHERE key = 1"),
                    row(1));
 
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
         {
             List<Row> rowsNet = executeNet(version, "SELECT " + fUdt0 + "(udt) FROM %s WHERE key = 1").all();
             Assert.assertEquals(1, rowsNet.size());
@@ -1521,7 +1520,7 @@ public class UFTest extends CQLTester
         assertRows(execute("SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
                    row("three", "one", "two"));
 
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
             assertRowsNet(version,
                           executeNet(version, "SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
                           row("three", "one", "two"));
@@ -1777,7 +1776,7 @@ public class UFTest extends CQLTester
                                       "LANGUAGE JAVA\n" +
                                       "AS 'throw new RuntimeException();'");
 
-        for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
+        for (int version : PROTOCOL_VERSIONS)
         {
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
index 7716b4c..5cdeb78 100644
--- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@ -18,20 +18,19 @@
 package org.apache.cassandra.service;
 
 import org.apache.commons.lang3.StringUtils;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.transport.Message;
 import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.transport.messages.QueryMessage;
 
 import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNull;
 
 public class ClientWarningsTest extends CQLTester
 {
@@ -77,21 +76,6 @@ public class ClientWarningsTest extends CQLTester
         }
     }
 
-    @Test
-    public void testLargeBatchWithProtoV2() throws Exception
-    {
-        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
-        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_2))
-        {
-            client.connect(false);
-
-            QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
-            Message.Response resp = client.execute(query);
-            assertNull(resp.getWarnings());
-        }
-    }
-
     private String createBatchStatement(int minSize)
     {
         return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) VALUES (1, '%s') APPLY BATCH;",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
index 9910167..f47f355 100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
@@ -20,25 +20,36 @@ package org.apache.cassandra.transport;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.cassandra.transport.messages.ErrorMessage;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.cassandra.transport.Message.Direction.*;
+
 public class ProtocolErrorTest {
 
     @Test
     public void testInvalidProtocolVersion() throws Exception
     {
+        // test using a protocol version higher than the current version
+        testInvalidProtocolVersion(Server.CURRENT_VERSION + 1);
+     // test using a protocol version lower than the lowest version
+        testInvalidProtocolVersion(Server.MIN_SUPPORTED_VERSION - 1);
+
+    }
+
+    public void testInvalidProtocolVersion(int version) throws Exception
+    {
         Frame.Decoder dec = new Frame.Decoder(null);
 
         List<Object> results = new ArrayList<>();
-        // should generate a protocol exception for using a protocol version higher than the current version
         byte[] frame = new byte[] {
-                (byte) ((Server.CURRENT_VERSION + 1) & Frame.PROTOCOL_VERSION_MASK),  // direction & version
+                (byte) REQUEST.addToVersion(version),  // direction & version
                 0x00,  // flags
-                0x01,  // stream ID
+                0x00, 0x01,  // stream ID
                 0x09,  // opcode
                 0x00, 0x00, 0x00, 0x21,  // body length
                 0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45,
@@ -52,6 +63,7 @@ public class ProtocolErrorTest {
             dec.decode(null, buf, results);
             Assert.fail("Expected protocol error");
         } catch (ProtocolException e) {
+            Assert.assertTrue(e.getMessage().contains("Invalid or unsupported protocol version"));
         }
     }
 
@@ -64,9 +76,9 @@ public class ProtocolErrorTest {
         // should generate a protocol exception for using a response frame with
         // a prepare op, ensure that it comes back with stream ID 1
         byte[] frame = new byte[] {
-                (byte) 0x82,  // direction & version
+                (byte) RESPONSE.addToVersion(Server.CURRENT_VERSION),  // direction & version
                 0x00,  // flags
-                0x01,  // stream ID
+                0x00, 0x01,  // stream ID
                 0x09,  // opcode
                 0x00, 0x00, 0x00, 0x21,  // body length
                 0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45,
@@ -82,29 +94,7 @@ public class ProtocolErrorTest {
         } catch (ErrorMessage.WrappedException e) {
             // make sure the exception has the correct stream ID
             Assert.assertEquals(1, e.getStreamId());
-        }
-    }
-
-    @Test
-    public void testNegativeBodyLength() throws Exception
-    {
-        Frame.Decoder dec = new Frame.Decoder(null);
-
-        List<Object> results = new ArrayList<>();
-        byte[] frame = new byte[] {
-                (byte) 0x82,  // direction & version
-                0x00,  // flags
-                0x01,  // stream ID
-                0x09,  // opcode
-                (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,  // body length (-1)
-        };
-        ByteBuf buf = Unpooled.wrappedBuffer(frame);
-        try {
-            dec.decode(null, buf, results);
-            Assert.fail("Expected protocol error");
-        } catch (ErrorMessage.WrappedException e) {
-            // make sure the exception has the correct stream ID
-            Assert.assertEquals(1, e.getStreamId());
+            Assert.assertTrue(e.getMessage().contains("Wrong protocol direction"));
         }
     }
 
@@ -115,19 +105,21 @@ public class ProtocolErrorTest {
 
         List<Object> results = new ArrayList<>();
         byte[] frame = new byte[] {
-                (byte) 0x82,  // direction & version
+                (byte) REQUEST.addToVersion(Server.CURRENT_VERSION),  // direction & version
                 0x00,  // flags
-                0x01,  // stream ID
+                0x00, 0x01,  // stream ID
                 0x09,  // opcode
-                0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff,  // body length
+                0x10, (byte) 0x00, (byte) 0x00, (byte) 0x00,  // body length
         };
-        ByteBuf buf = Unpooled.wrappedBuffer(frame);
+        byte[] body = new byte[0x10000000];
+        ByteBuf buf = Unpooled.wrappedBuffer(frame, body);
         try {
             dec.decode(null, buf, results);
             Assert.fail("Expected protocol error");
         } catch (ErrorMessage.WrappedException e) {
             // make sure the exception has the correct stream ID
             Assert.assertEquals(1, e.getStreamId());
+            Assert.assertTrue(e.getMessage().contains("Request is too big"));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index 352327e..fdb346e 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -25,7 +25,6 @@ import io.netty.buffer.ByteBuf;
 
 import org.junit.Test;
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.transport.Event.TopologyChange;
@@ -46,8 +45,8 @@ public class SerDeserTest
     @Test
     public void collectionSerDeserTest() throws Exception
     {
-        collectionSerDeserTest(2);
         collectionSerDeserTest(3);
+        collectionSerDeserTest(4);
     }
 
     public void collectionSerDeserTest(int version) throws Exception
@@ -93,7 +92,6 @@ public class SerDeserTest
     @Test
     public void eventSerDeserTest() throws Exception
     {
-        eventSerDeserTest(2);
         eventSerDeserTest(3);
         eventSerDeserTest(4);
     }
@@ -173,8 +171,8 @@ public class SerDeserTest
     @Test
     public void udtSerDeserTest() throws Exception
     {
-        udtSerDeserTest(2);
         udtSerDeserTest(3);
+        udtSerDeserTest(4);
     }
 
     public void udtSerDeserTest(int version) throws Exception
@@ -200,10 +198,6 @@ public class SerDeserTest
         Term t = u.prepare("ks", columnSpec("myValue", udt));
 
         QueryOptions options = QueryOptions.DEFAULT;
-        if (version == 2)
-            options = QueryOptions.fromProtocolV2(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList());
-        else if (version != 3)
-            throw new AssertionError("Invalid protocol version for test");
 
         ByteBuffer serialized = t.bindAndGet(options);