You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/02/27 23:11:47 UTC

[1/2] cassandra git commit: Preserve stream ID for more protocol errors

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 649a5cd12 -> e76ebcec1


Preserve stream ID for more protocol errors

Patch by Chris Bannister and Tyler Hobbs for CASSANDRA-8848


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5654e736
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5654e736
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5654e736

Branch: refs/heads/cassandra-2.1
Commit: 5654e7368c2d68b3701cb5cdf190975f1079b10c
Parents: 98502e0
Author: Chris Bannister <c....@gmail.com>
Authored: Fri Feb 27 16:09:26 2015 -0600
Committer: Tyler Hobbs <ty...@apache.org>
Committed: Fri Feb 27 16:09:26 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +-
 .../org/apache/cassandra/transport/Frame.java   | 18 +++-
 .../transport/messages/ErrorMessage.java        |  9 +-
 .../cassandra/transport/ProtocolErrorTest.java  | 97 ++++++++++++++++++++
 4 files changed, 123 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f2b4469..3a8d824 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.13:
+ * Preserve stream ID for more protocol errors (CASSANDRA-8848)
  * Fix combining token() function with multi-column relations on
    clustering columns (CASSANDRA-8797)
  * Make CFS.markReferenced() resistant to bad refcounting (CASSANDRA-8829)
@@ -7,7 +8,8 @@
    table with ASC ordering and paging (CASSANDRA-8767)
  * AssertionError: "Memory was freed" when running cleanup (CASSANDRA-8716)
  * Make it possible to set max_sstable_age to fractional days (CASSANDRA-8406)
- * Fix memory leak in SSTableSimple*Writer and SSTableReader.validate() (CASSANDRA-8748)
+ * Fix memory leak in SSTableSimple*Writer and SSTableReader.validate()
+   (CASSANDRA-8748)
  * Fix some multi-column relations with indexes on some clustering
    columns (CASSANDRA-8275)
  * Fix IllegalArgumentException in dynamic snitch (CASSANDRA-8448)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 89755df..7520c41 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -166,14 +166,22 @@ public class Frame
             int streamId = buffer.getByte(idx + 2);
 
             // This throws a protocol exceptions if the opcode is unknown
-            Message.Type type = Message.Type.fromOpcode(buffer.getByte(idx + 3), direction);
+            Message.Type type;
+            try
+            {
+                type = Message.Type.fromOpcode(buffer.getByte(idx + 3), direction);
+            }
+            catch (ProtocolException e)
+            {
+                throw ErrorMessage.wrap(e, streamId);
+            }
 
             long bodyLength = buffer.getUnsignedInt(idx + Header.BODY_LENGTH_OFFSET);
 
             if (bodyLength < 0)
             {
                 buffer.skipBytes(Header.LENGTH);
-                throw new ProtocolException("Invalid frame body length: " + bodyLength);
+                throw ErrorMessage.wrap(new ProtocolException("Invalid frame body length: " + bodyLength), streamId);
             }
 
             long frameLength = bodyLength + Header.LENGTH;
@@ -207,7 +215,11 @@ public class Frame
             }
             else if (connection.getVersion() != version)
             {
-                throw new ProtocolException(String.format("Invalid message version. Got %d but previous messages on this connection had version %d", version, connection.getVersion()));
+                throw ErrorMessage.wrap(
+                        new ProtocolException(String.format(
+                                "Invalid message version. Got %d but previous messages on this connection had version %d",
+                                version, connection.getVersion())),
+                        streamId);
             }
 
             return new Frame(new Header(version, flags, streamId, type), body);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 4d60a1f..e27fb88 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.transport.messages;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
@@ -243,7 +244,8 @@ public class ErrorMessage extends Message.Response
         return new WrappedException(t, streamId);
     }
 
-    private static class WrappedException extends RuntimeException
+    @VisibleForTesting
+    public static class WrappedException extends RuntimeException
     {
         private final int streamId;
 
@@ -252,6 +254,11 @@ public class ErrorMessage extends Message.Response
             super(cause);
             this.streamId = streamId;
         }
+
+        public int getStreamId()
+        {
+            return this.streamId;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
new file mode 100644
index 0000000..387d159
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.HeapChannelBufferFactory;
+import org.apache.cassandra.transport.messages.ErrorMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ProtocolErrorTest {
+
+    @Test
+    public void testInvalidDirection() throws Exception
+    {
+        Frame.Decoder dec = new Frame.Decoder(null);
+
+        // should generate a protocol exception for using a response frame with
+        // a prepare op, ensure that it comes back with stream ID 1
+        byte[] frame = new byte[] {
+                (byte) 0x82,  // direction & version
+                0x00,  // flags
+                0x01,  // stream ID
+                0x09,  // opcode
+                0x00, 0x00, 0x00, 0x21,  // body length
+                0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45,
+                0x4c, 0x45, 0x43, 0x54, 0x20, 0x2a, 0x20, 0x46,
+                0x52, 0x4f, 0x4d, 0x20, 0x73, 0x79, 0x73, 0x74,
+                0x65, 0x6d, 0x2e, 0x6c, 0x6f, 0x63, 0x61, 0x6c,
+                0x3b
+        };
+        ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, frame.length);
+        try {
+            dec.decode(null, null, buf);
+        } catch (ErrorMessage.WrappedException e) {
+            // make sure the exception has the correct stream ID
+            Assert.assertEquals(1, e.getStreamId());
+        }
+    }
+
+    @Test
+    public void testNegativeBodyLength() throws Exception
+    {
+        Frame.Decoder dec = new Frame.Decoder(null);
+
+        byte[] frame = new byte[] {
+                (byte) 0x82,  // direction & version
+                0x00,  // flags
+                0x01,  // stream ID
+                0x09,  // opcode
+                (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,  // body length (-1)
+        };
+        ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, frame.length);
+        try {
+            dec.decode(null, null, buf);
+        } catch (ErrorMessage.WrappedException e) {
+            // make sure the exception has the correct stream ID
+            Assert.assertEquals(1, e.getStreamId());
+        }
+    }
+
+    @Test
+    public void testBodyLengthOverLimit() throws Exception
+    {
+        Frame.Decoder dec = new Frame.Decoder(null);
+
+        byte[] frame = new byte[] {
+                (byte) 0x82,  // direction & version
+                0x00,  // flags
+                0x01,  // stream ID
+                0x09,  // opcode
+                0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff,  // body length
+        };
+        ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, frame.length);
+        try {
+            dec.decode(null, null, buf);
+        } catch (ErrorMessage.WrappedException e) {
+            // make sure the exception has the correct stream ID
+            Assert.assertEquals(1, e.getStreamId());
+        }
+    }
+}


[2/2] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by ty...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e76ebcec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e76ebcec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e76ebcec

Branch: refs/heads/cassandra-2.1
Commit: e76ebcec14a4c88c2433bdc0a4e4cd656ba5af2a
Parents: 649a5cd 5654e73
Author: Tyler Hobbs <ty...@apache.org>
Authored: Fri Feb 27 16:11:37 2015 -0600
Committer: Tyler Hobbs <ty...@apache.org>
Committed: Fri Feb 27 16:11:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +-
 .../org/apache/cassandra/transport/Frame.java   |  18 +++-
 .../transport/messages/ErrorMessage.java        |   9 +-
 .../cassandra/transport/ProtocolErrorTest.java  | 103 +++++++++++++++++++
 4 files changed, 129 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e76ebcec/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ec5e233,3a8d824..a3e52f7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,24 -1,5 +1,25 @@@
 -2.0.13:
 +2.1.4
 + * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856)
 + * Fix parallelism adjustment in range and secondary index queries
 +   when the first fetch does not satisfy the limit (CASSANDRA-8856)
 + * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843)
 + * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842)
 + * Fix CommitLog.forceRecycleAllSegments() memory access error (CASSANDRA-8812)
 + * Improve assertions in Memory (CASSANDRA-8792)
 + * Fix SSTableRewriter cleanup (CASSANDRA-8802)
 + * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758)
 + * 'nodetool info' prints exception against older node (CASSANDRA-8796)
 + * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750)
 + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
 + * Enforce SSTableReader.first/last (CASSANDRA-8744)
 + * Cleanup SegmentedFile API (CASSANDRA-8749)
 + * Avoid overlap with early compaction replacement (CASSANDRA-8683)
 + * Safer Resource Management++ (CASSANDRA-8707)
 + * Write partition size estimates into a system table (CASSANDRA-7688)
 + * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
 +   (CASSANDRA-8154)
 +Merged from 2.0:
+  * Preserve stream ID for more protocol errors (CASSANDRA-8848)
   * Fix combining token() function with multi-column relations on
     clustering columns (CASSANDRA-8797)
   * Make CFS.markReferenced() resistant to bad refcounting (CASSANDRA-8829)
@@@ -27,104 -8,10 +28,105 @@@
     table with ASC ordering and paging (CASSANDRA-8767)
   * AssertionError: "Memory was freed" when running cleanup (CASSANDRA-8716)
   * Make it possible to set max_sstable_age to fractional days (CASSANDRA-8406)
 - * Fix memory leak in SSTableSimple*Writer and SSTableReader.validate()
 -   (CASSANDRA-8748)
   * Fix some multi-column relations with indexes on some clustering
     columns (CASSANDRA-8275)
-  * Fix memory leak in SSTableSimple*Writer and SSTableReader.validate() (CASSANDRA-8748)
++ * Fix memory leak in SSTableSimple*Writer and SSTableReader.validate()
++   (CASSANDRA-8748)
 + * Throw OOM if allocating memory fails to return a valid pointer (CASSANDRA-8726)
 + * Fix SSTableSimpleUnsortedWriter ConcurrentModificationException (CASSANDRA-8619)
 + * 'nodetool info' prints exception against older node (CASSANDRA-8796)
 + * Ensure SSTableSimpleUnsortedWriter.close() terminates if
 +   disk writer has crashed (CASSANDRA-8807)
 +
 +
 +2.1.3
 + * Fix HSHA/offheap_objects corruption (CASSANDRA-8719)
 + * Upgrade libthrift to 0.9.2 (CASSANDRA-8685)
 + * Don't use the shared ref in sstableloader (CASSANDRA-8704)
 + * Purge internal prepared statements if related tables or
 +   keyspaces are dropped (CASSANDRA-8693)
 + * (cqlsh) Handle unicode BOM at start of files (CASSANDRA-8638)
 + * Stop compactions before exiting offline tools (CASSANDRA-8623)
 + * Update tools/stress/README.txt to match current behaviour (CASSANDRA-7933)
 + * Fix schema from Thrift conversion with empty metadata (CASSANDRA-8695)
 + * Safer Resource Management (CASSANDRA-7705)
 + * Make sure we compact highly overlapping cold sstables with
 +   STCS (CASSANDRA-8635)
 + * rpc_interface and listen_interface generate NPE on startup when specified
 +   interface doesn't exist (CASSANDRA-8677)
 + * Fix ArrayIndexOutOfBoundsException in nodetool cfhistograms (CASSANDRA-8514)
 + * Switch from yammer metrics for nodetool cf/proxy histograms (CASSANDRA-8662)
 + * Make sure we don't add tmplink files to the compaction
 +   strategy (CASSANDRA-8580)
 + * (cqlsh) Handle maps with blob keys (CASSANDRA-8372)
 + * (cqlsh) Handle DynamicCompositeType schemas correctly (CASSANDRA-8563)
 + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6706)
 + * Add tooling to detect hot partitions (CASSANDRA-7974)
 + * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
 + * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
 + * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
 + * Invalidate prepared BATCH statements when related tables
 +   or keyspaces are dropped (CASSANDRA-8652)
 + * Fix missing results in secondary index queries on collections
 +   with ALLOW FILTERING (CASSANDRA-8421)
 + * Expose EstimatedHistogram metrics for range slices (CASSANDRA-8627)
 + * (cqlsh) Escape clqshrc passwords properly (CASSANDRA-8618)
 + * Fix NPE when passing wrong argument in ALTER TABLE statement (CASSANDRA-8355)
 + * Pig: Refactor and deprecate CqlStorage (CASSANDRA-8599)
 + * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
 + * Fix case-sensitivity of index name on CREATE and DROP INDEX
 +   statements (CASSANDRA-8365)
 + * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192)
 + * Use the correct repairedAt value when closing writer (CASSANDRA-8570)
 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
 + * Properly calculate expected write size during compaction (CASSANDRA-8532)
 + * Invalidate affected prepared statements when a table's columns
 +   are altered (CASSANDRA-7910)
 + * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
 + * Fix regression in SSTableRewriter causing some rows to become unreadable 
 +   during compaction (CASSANDRA-8429)
 + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
 + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
 +   is disabled (CASSANDRA-8288)
 + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
 + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
 + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
 + * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
 + * Disable mmap on Windows (CASSANDRA-6993)
 + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
 + * Add auth support to cassandra-stress (CASSANDRA-7985)
 + * Fix ArrayIndexOutOfBoundsException when generating error message
 +   for some CQL syntax errors (CASSANDRA-8455)
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 + * Fix sstableupgrade throws exception (CASSANDRA-8688)
 + * Fix hang when repairing empty keyspace (CASSANDRA-8694)
 +Merged from 2.0:
   * Fix IllegalArgumentException in dynamic snitch (CASSANDRA-8448)
   * Add support for UPDATE ... IF EXISTS (CASSANDRA-8610)
   * Fix reversal of list prepends (CASSANDRA-8733)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e76ebcec/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/Frame.java
index 01bee10,7520c41..2868ed4
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@@ -183,39 -162,29 +183,47 @@@ public class Fram
              if (version > Server.CURRENT_VERSION)
                  throw new ProtocolException("Invalid or unsupported protocol version: " + version);
  
 -            int flags = buffer.getByte(idx + 1);
 -            int streamId = buffer.getByte(idx + 2);
 +            // Wait until we have the complete V3+ header
 +            if (version >= Server.VERSION_3 && buffer.readableBytes() < Header.MODERN_LENGTH)
 +                return;
 +
 +            int flags = buffer.getByte(idx++);
 +
 +            int streamId, headerLength;
 +            if (version >= Server.VERSION_3)
 +            {
 +                streamId = buffer.getShort(idx);
 +                idx += 2;
 +                headerLength = Header.MODERN_LENGTH;
 +            }
 +            else
 +            {
 +                streamId = buffer.getByte(idx);
 +                idx++;
 +                headerLength = Header.LEGACY_LENGTH;
 +            }
  
              // This throws a protocol exceptions if the opcode is unknown
-             Message.Type type = Message.Type.fromOpcode(buffer.getByte(idx++), direction);
+             Message.Type type;
+             try
+             {
 -                type = Message.Type.fromOpcode(buffer.getByte(idx + 3), direction);
++                type = Message.Type.fromOpcode(buffer.getByte(idx++), direction);
+             }
+             catch (ProtocolException e)
+             {
+                 throw ErrorMessage.wrap(e, streamId);
+             }
  
 -            long bodyLength = buffer.getUnsignedInt(idx + Header.BODY_LENGTH_OFFSET);
 +            long bodyLength = buffer.getUnsignedInt(idx);
 +            idx += Header.BODY_LENGTH_SIZE;
  
              if (bodyLength < 0)
              {
 -                buffer.skipBytes(Header.LENGTH);
 +                buffer.skipBytes(headerLength);
-                 throw new ProtocolException("Invalid frame body length: " + bodyLength);
+                 throw ErrorMessage.wrap(new ProtocolException("Invalid frame body length: " + bodyLength), streamId);
              }
  
 -            long frameLength = bodyLength + Header.LENGTH;
 +            long frameLength = bodyLength + headerLength;
              if (frameLength > MAX_FRAME_LENGTH)
              {
                  // Enter the discard mode and discard everything received so far.
@@@ -247,10 -215,14 +255,14 @@@
              }
              else if (connection.getVersion() != version)
              {
-                 throw new ProtocolException(String.format("Invalid message version. Got %d but previous messages on this connection had version %d", version, connection.getVersion()));
+                 throw ErrorMessage.wrap(
+                         new ProtocolException(String.format(
+                                 "Invalid message version. Got %d but previous messages on this connection had version %d",
+                                 version, connection.getVersion())),
+                         streamId);
              }
  
 -            return new Frame(new Header(version, flags, streamId, type), body);
 +            results.add(new Frame(new Header(version, flags, streamId, type), body));
          }
  
          private void fail()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e76ebcec/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 7e4a3a9,e27fb88..a049a57
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@@ -17,9 -17,9 +17,10 @@@
   */
  package org.apache.cassandra.transport.messages;
  
+ import com.google.common.annotations.VisibleForTesting;
 +import io.netty.buffer.ByteBuf;
 +import io.netty.handler.codec.CodecException;
  import com.google.common.base.Predicate;
 -import org.jboss.netty.buffer.ChannelBuffer;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -256,7 -244,8 +257,7 @@@ public class ErrorMessage extends Messa
          return new WrappedException(t, streamId);
      }
  
-     private static class WrappedException extends RuntimeException
 -    @VisibleForTesting
+     public static class WrappedException extends RuntimeException
      {
          private final int streamId;
  
@@@ -265,6 -254,11 +266,12 @@@
              super(cause);
              this.streamId = streamId;
          }
+ 
++        @VisibleForTesting
+         public int getStreamId()
+         {
+             return this.streamId;
+         }
      }
  
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e76ebcec/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
index 0000000,387d159..91f7355
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
@@@ -1,0 -1,97 +1,103 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.transport;
+ 
 -import org.jboss.netty.buffer.ChannelBuffer;
 -import org.jboss.netty.buffer.HeapChannelBufferFactory;
++import io.netty.buffer.ByteBuf;
++import io.netty.buffer.Unpooled;
+ import org.apache.cassandra.transport.messages.ErrorMessage;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
++import java.util.ArrayList;
++import java.util.List;
++
+ public class ProtocolErrorTest {
+ 
+     @Test
+     public void testInvalidDirection() throws Exception
+     {
+         Frame.Decoder dec = new Frame.Decoder(null);
+ 
++        List<Object> results = new ArrayList<>();
+         // should generate a protocol exception for using a response frame with
+         // a prepare op, ensure that it comes back with stream ID 1
+         byte[] frame = new byte[] {
+                 (byte) 0x82,  // direction & version
+                 0x00,  // flags
+                 0x01,  // stream ID
+                 0x09,  // opcode
+                 0x00, 0x00, 0x00, 0x21,  // body length
+                 0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45,
+                 0x4c, 0x45, 0x43, 0x54, 0x20, 0x2a, 0x20, 0x46,
+                 0x52, 0x4f, 0x4d, 0x20, 0x73, 0x79, 0x73, 0x74,
+                 0x65, 0x6d, 0x2e, 0x6c, 0x6f, 0x63, 0x61, 0x6c,
+                 0x3b
+         };
 -        ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, frame.length);
++        ByteBuf buf = Unpooled.wrappedBuffer(frame);
+         try {
 -            dec.decode(null, null, buf);
++            dec.decode(null, buf, results);
+         } catch (ErrorMessage.WrappedException e) {
+             // make sure the exception has the correct stream ID
+             Assert.assertEquals(1, e.getStreamId());
+         }
+     }
+ 
+     @Test
+     public void testNegativeBodyLength() throws Exception
+     {
+         Frame.Decoder dec = new Frame.Decoder(null);
+ 
++        List<Object> results = new ArrayList<>();
+         byte[] frame = new byte[] {
+                 (byte) 0x82,  // direction & version
+                 0x00,  // flags
+                 0x01,  // stream ID
+                 0x09,  // opcode
+                 (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,  // body length (-1)
+         };
 -        ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, frame.length);
++        ByteBuf buf = Unpooled.wrappedBuffer(frame);
+         try {
 -            dec.decode(null, null, buf);
++            dec.decode(null, buf, results);
+         } catch (ErrorMessage.WrappedException e) {
+             // make sure the exception has the correct stream ID
+             Assert.assertEquals(1, e.getStreamId());
+         }
+     }
+ 
+     @Test
+     public void testBodyLengthOverLimit() throws Exception
+     {
+         Frame.Decoder dec = new Frame.Decoder(null);
+ 
++        List<Object> results = new ArrayList<>();
+         byte[] frame = new byte[] {
+                 (byte) 0x82,  // direction & version
+                 0x00,  // flags
+                 0x01,  // stream ID
+                 0x09,  // opcode
+                 0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff,  // body length
+         };
 -        ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, frame.length);
++        ByteBuf buf = Unpooled.wrappedBuffer(frame);
+         try {
 -            dec.decode(null, null, buf);
++            dec.decode(null, buf, results);
+         } catch (ErrorMessage.WrappedException e) {
+             // make sure the exception has the correct stream ID
+             Assert.assertEquals(1, e.getStreamId());
+         }
+     }
+ }