You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/03/10 23:38:44 UTC

cassandra git commit: Indicate partition key cols in PREPARED responses

Repository: cassandra
Updated Branches:
  refs/heads/trunk 9eebe874d -> dca37a612


Indicate partition key cols in PREPARED responses

Patch by Tyler Hobbs; reviewed by Benjamin Lerer for CASSANDRA-7660


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dca37a61
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dca37a61
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dca37a61

Branch: refs/heads/trunk
Commit: dca37a612cfba1b0eae07a5dd4f16a37407a2a51
Parents: 9eebe87
Author: Tyler Hobbs <ty...@apache.org>
Authored: Tue Mar 10 17:35:25 2015 -0500
Committer: Tyler Hobbs <ty...@apache.org>
Committed: Tue Mar 10 17:36:23 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 doc/native_protocol_v4.spec                     |  71 ++++--
 .../cassandra/cql3/ColumnSpecification.java     |  37 ++-
 .../org/apache/cassandra/cql3/ResultSet.java    | 242 ++++++++++++++++---
 .../cassandra/cql3/VariableSpecifications.java  |  43 +++-
 .../cassandra/cql3/selection/Selection.java     |   6 +-
 .../cql3/statements/BatchStatement.java         |  23 +-
 .../cql3/statements/ModificationStatement.java  |   7 +-
 .../cql3/statements/ParsedStatement.java        |  10 +-
 .../cql3/statements/SelectStatement.java        |   4 +-
 .../org/apache/cassandra/transport/Frame.java   |   3 +-
 .../org/apache/cassandra/transport/Server.java  |   2 +-
 .../transport/messages/ResultMessage.java       |  28 +--
 .../cassandra/transport/SerDeserTest.java       |  31 +++
 14 files changed, 423 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d8c9b62..3053362 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.0
+ * Indicate partition key columns in response metadata for prepared
+   statements (CASSANDRA-7660)
  * Merge UUIDType and TimeUUIDType parse logic (CASSANDRA-8759)
  * Avoid memory allocation when searching index summary (CASSANDRA-8793)
  * Optimise (Time)?UUIDType Comparisons (CASSANDRA-8730)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index a3a3fd3..03a5a50 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -616,23 +616,66 @@ Table of Contents
 
 4.2.5.4. Prepared
 
-  The result to a PREPARE message. The rest of the body of a Prepared result is:
+  The result to a PREPARE message. The body of a Prepared result is:
     <id><metadata><result_metadata>
   where:
     - <id> is [short bytes] representing the prepared query ID.
-    - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2; you
-      can however assume that the Has_more_pages flag is always off) and
-      is the specification for the variable bound in this prepare statement.
-    - <result_metadata> is defined exactly as <metadata> but correspond to the
-      metadata for the resultSet that execute this query will yield. Note that
-      <result_metadata> may be empty (have the No_metadata flag and 0 columns, See
-      section 4.2.5.2) and will be for any query that is not a Select. There is
-      in fact never a guarantee that this will non-empty so client should protect
-      themselves accordingly. The presence of this information is an
-      optimization that allows to later execute the statement that has been
-      prepared without requesting the metadata (Skip_metadata flag in EXECUTE).
-      Clients can safely discard this metadata if they do not want to take
-      advantage of that optimization.
+    - <metadata> is composed of:
+        <flags><columns_count><pk_count>[<pk_index_1>...<pk_index_n>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
+      where:
+        - <flags> is an [int]. The bits of <flags> provides information on the
+          formatting of the remaining informations. A flag is set if the bit
+          corresponding to its `mask` is set. Supported masks and their flags
+          are:
+            0x0001    Global_tables_spec: if set, only one table spec (keyspace
+                      and table name) is provided as <global_table_spec>. If not
+                      set, <global_table_spec> is not present.
+        - <columns_count> is an [int] representing the number of bind markers
+          in the prepared statement.  It defines the number of <col_spec_i>
+          elements.
+        - <pk_count> is an [int] representing the number of <pk_index_i>
+          elements to follow. If this value is zero, at least one of the
+          partition key columns in the table that the statement acts on
+          did not have a corresponding bind marker (or the bind marker
+          was wrapped in a function call).
+        - <pk_index_i> is a short that represents the index of the bind marker
+          that corresponds to the partition key column in position i.
+          For example, a <pk_index> sequence of [2, 0, 1] indicates that the
+          table has three partition key columns; the full partition key
+          can be constructed by creating a composite of the values for
+          the bind markers at index 2, at index 0, and at index 1.
+          This allows implementations with token-aware routing to correctly
+          construct the partition key without needing to inspect table
+          metadata.
+        - <global_table_spec> is present if the Global_tables_spec is set in
+          <flags>. If present, it is composed of two [string]s. The first
+          [string] is the name of the keyspace that the statement acts on.
+          The second [string] is the name of the table that the columns
+          represented by the bind markers belong to.
+        - <col_spec_i> specifies the bind markers in the prepared statement.
+          There are <column_count> such column specifications, each with the
+          following format:
+            (<ksname><tablename>)?<name><type>
+          The initial <ksname> and <tablename> are two [string] that are only
+          present if the Global_tables_spec flag is not set. The <name> field
+          is a [string] that holds the name of the bind marker (if named),
+          or the name of the column, field, or expression that the bind marker
+          corresponds to (if the bind marker is "anonymous").  The <type>
+          field is an [option] that represents the expected type of values for
+          the bind marker.  See the Rows documentation (section 4.2.5.2) for
+          full details on the <type> field.
+
+    - <result_metadata> is defined exactly the same as <metadata> in the Rows
+      documentation (section 4.2.5.2).  This describes the metadata for the
+      result set that will be returned when this prepared statement is executed.
+      Note that <result_metadata> may be empty (have the No_metadata flag and
+      0 columns, See section 4.2.5.2) and will be for any query that is not a
+      Select. In fact, there is never a guarantee that this will non-empty, so
+      implementations should protect themselves accordingly. This result metadata
+      is an optimization that allows implementations to later execute the
+      prepared statement without requesting the metadata (see the Skip_metadata
+      flag in EXECUTE).  Clients can safely discard this metadata if they do not
+      want to take advantage of that optimization.
 
   Note that prepared query ID return is global to the node on which the query
   has been prepared. It can be used on any connection to that node and this

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnSpecification.java b/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
index cc54375..bc5a914 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.cql3;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ReversedType;
 
+import java.util.Collection;
+import java.util.Iterator;
+
 public class ColumnSpecification
 {
     public final String ksName;
@@ -45,9 +48,41 @@ public class ColumnSpecification
     {
         return new ColumnSpecification(ksName, cfName, alias, type);
     }
-    
+
     public boolean isReversedType()
     {
         return type instanceof ReversedType;
     }
+
+    /**
+     * Returns true if all ColumnSpecifications are in the same table, false otherwise.
+     */
+    public static boolean allInSameTable(Collection<ColumnSpecification> names)
+    {
+        if (names == null || names.isEmpty())
+            return false;
+
+        Iterator<ColumnSpecification> iter = names.iterator();
+        ColumnSpecification first = iter.next();
+        while (iter.hasNext())
+        {
+            ColumnSpecification name = iter.next();
+            if (!name.ksName.equals(first.ksName) || !name.cfName.equals(first.cfName))
+                return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if (!(other instanceof ColumnSpecification))
+            return false;
+
+        ColumnSpecification that = (ColumnSpecification) other;
+        return this.ksName.equals(that.ksName) &&
+               this.cfName.equals(that.cfName) &&
+               this.name.equals(that.name) &&
+               this.type.equals(that.type);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 75a9c8e..c0982c4 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -38,15 +38,15 @@ public class ResultSet
     public static final Codec codec = new Codec();
     private static final ColumnIdentifier COUNT_COLUMN = new ColumnIdentifier("count", false);
 
-    public final Metadata metadata;
+    public final ResultMetadata metadata;
     public final List<List<ByteBuffer>> rows;
 
     public ResultSet(List<ColumnSpecification> metadata)
     {
-        this(new Metadata(metadata), new ArrayList<List<ByteBuffer>>());
+        this(new ResultMetadata(metadata), new ArrayList<List<ByteBuffer>>());
     }
 
-    public ResultSet(Metadata metadata, List<List<ByteBuffer>> rows)
+    public ResultSet(ResultMetadata metadata, List<List<ByteBuffer>> rows)
     {
         this.metadata = metadata;
         this.rows = rows;
@@ -180,7 +180,7 @@ public class ResultSet
          */
         public ResultSet decode(ByteBuf body, int version)
         {
-            Metadata m = Metadata.codec.decode(body, version);
+            ResultMetadata m = ResultMetadata.codec.decode(body, version);
             int rowCount = body.readInt();
             ResultSet rs = new ResultSet(m, new ArrayList<List<ByteBuffer>>(rowCount));
 
@@ -194,12 +194,12 @@ public class ResultSet
 
         public void encode(ResultSet rs, ByteBuf dest, int version)
         {
-            Metadata.codec.encode(rs.metadata, dest, version);
+            ResultMetadata.codec.encode(rs.metadata, dest, version);
             dest.writeInt(rs.rows.size());
             for (List<ByteBuffer> row : rs.rows)
             {
                 // Note that we do only want to serialize only the first columnCount values, even if the row
-                // as more: see comment on Metadata.names field.
+                // as more: see comment on ResultMetadata.names field.
                 for (int i = 0; i < rs.metadata.columnCount; i++)
                     CBUtil.writeValue(row.get(i), dest);
             }
@@ -207,7 +207,7 @@ public class ResultSet
 
         public int encodedSize(ResultSet rs, int version)
         {
-            int size = Metadata.codec.encodedSize(rs.metadata, version) + 4;
+            int size = ResultMetadata.codec.encodedSize(rs.metadata, version) + 4;
             for (List<ByteBuffer> row : rs.rows)
             {
                 for (int i = 0; i < rs.metadata.columnCount; i++)
@@ -217,11 +217,14 @@ public class ResultSet
         }
     }
 
-    public static class Metadata
+    /**
+     * The metadata for the results of executing a query or prepared statement.
+     */
+    public static class ResultMetadata
     {
-        public static final CBCodec<Metadata> codec = new Codec();
+        public static final CBCodec<ResultMetadata> codec = new Codec();
 
-        public static final Metadata EMPTY = new Metadata(EnumSet.of(Flag.NO_METADATA), null, 0, null);
+        public static final ResultMetadata EMPTY = new ResultMetadata(EnumSet.of(Flag.NO_METADATA), null, 0, null);
 
         private final EnumSet<Flag> flags;
         // Please note that columnCount can actually be smaller than names, even if names is not null. This is
@@ -232,14 +235,14 @@ public class ResultSet
         private final int columnCount;
         private PagingState pagingState;
 
-        public Metadata(List<ColumnSpecification> names)
+        public ResultMetadata(List<ColumnSpecification> names)
         {
             this(EnumSet.noneOf(Flag.class), names, names.size(), null);
-            if (!names.isEmpty() && allInSameCF())
+            if (!names.isEmpty() && ColumnSpecification.allInSameTable(names))
                 flags.add(Flag.GLOBAL_TABLES_SPEC);
         }
 
-        private Metadata(EnumSet<Flag> flags, List<ColumnSpecification> names, int columnCount, PagingState pagingState)
+        private ResultMetadata(EnumSet<Flag> flags, List<ColumnSpecification> names, int columnCount, PagingState pagingState)
         {
             this.flags = flags;
             this.names = names;
@@ -247,9 +250,9 @@ public class ResultSet
             this.pagingState = pagingState;
         }
 
-        public Metadata copy()
+        public ResultMetadata copy()
         {
-            return new Metadata(EnumSet.copyOf(flags), names, columnCount, pagingState);
+            return new ResultMetadata(EnumSet.copyOf(flags), names, columnCount, pagingState);
         }
 
         // The maximum number of values that the ResultSet can hold. This can be bigger than columnCount due to CASSANDRA-4911
@@ -265,24 +268,6 @@ public class ResultSet
             names.add(name);
         }
 
-        private boolean allInSameCF()
-        {
-            if (names == null)
-                return false;
-
-            assert !names.isEmpty();
-
-            Iterator<ColumnSpecification> iter = names.iterator();
-            ColumnSpecification first = iter.next();
-            while (iter.hasNext())
-            {
-                ColumnSpecification name = iter.next();
-                if (!name.ksName.equals(first.ksName) || !name.cfName.equals(first.cfName))
-                    return false;
-            }
-            return true;
-        }
-
         public void setHasMorePages(PagingState pagingState)
         {
             if (pagingState == null)
@@ -320,9 +305,9 @@ public class ResultSet
             return sb.toString();
         }
 
-        private static class Codec implements CBCodec<Metadata>
+        private static class Codec implements CBCodec<ResultMetadata>
         {
-            public Metadata decode(ByteBuf body, int version)
+            public ResultMetadata decode(ByteBuf body, int version)
             {
                 // flags & column count
                 int iflags = body.readInt();
@@ -335,7 +320,7 @@ public class ResultSet
                     state = PagingState.deserialize(CBUtil.readValue(body));
 
                 if (flags.contains(Flag.NO_METADATA))
-                    return new Metadata(flags, null, columnCount, state);
+                    return new ResultMetadata(flags, null, columnCount, state);
 
                 boolean globalTablesSpec = flags.contains(Flag.GLOBAL_TABLES_SPEC);
 
@@ -357,10 +342,10 @@ public class ResultSet
                     AbstractType type = DataType.toType(DataType.codec.decodeOne(body, version));
                     names.add(new ColumnSpecification(ksName, cfName, colName, type));
                 }
-                return new Metadata(flags, names, names.size(), state);
+                return new ResultMetadata(flags, names, names.size(), state);
             }
 
-            public void encode(Metadata m, ByteBuf dest, int version)
+            public void encode(ResultMetadata m, ByteBuf dest, int version)
             {
                 boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
                 boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
@@ -396,7 +381,7 @@ public class ResultSet
                 }
             }
 
-            public int encodedSize(Metadata m, int version)
+            public int encodedSize(ResultMetadata m, int version)
             {
                 boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
                 boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
@@ -431,6 +416,185 @@ public class ResultSet
         }
     }
 
+    /**
+     * The metadata for the query parameters in a prepared statement.
+     */
+    public static class PreparedMetadata
+    {
+        public static final CBCodec<PreparedMetadata> codec = new Codec();
+
+        private final EnumSet<Flag> flags;
+        public final List<ColumnSpecification> names;
+        private final Short[] partitionKeyBindIndexes;
+
+        public PreparedMetadata(List<ColumnSpecification> names, Short[] partitionKeyBindIndexes)
+        {
+            this(EnumSet.noneOf(Flag.class), names, partitionKeyBindIndexes);
+            if (!names.isEmpty() && ColumnSpecification.allInSameTable(names))
+                flags.add(Flag.GLOBAL_TABLES_SPEC);
+        }
+
+        private PreparedMetadata(EnumSet<Flag> flags, List<ColumnSpecification> names, Short[] partitionKeyBindIndexes)
+        {
+            this.flags = flags;
+            this.names = names;
+            this.partitionKeyBindIndexes = partitionKeyBindIndexes;
+        }
+
+        public PreparedMetadata copy()
+        {
+            return new PreparedMetadata(EnumSet.copyOf(flags), names, partitionKeyBindIndexes);
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof PreparedMetadata))
+                return false;
+
+            PreparedMetadata that = (PreparedMetadata) other;
+            return this.names.equals(that.names) &&
+                   this.flags.equals(that.flags) &&
+                   Arrays.equals(this.partitionKeyBindIndexes, that.partitionKeyBindIndexes);
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            for (ColumnSpecification name : names)
+            {
+                sb.append("[").append(name.name);
+                sb.append("(").append(name.ksName).append(", ").append(name.cfName).append(")");
+                sb.append(", ").append(name.type).append("]");
+            }
+
+            sb.append(", bindIndexes=[");
+            if (partitionKeyBindIndexes != null)
+            {
+                for (int i = 0; i < partitionKeyBindIndexes.length; i++)
+                {
+                    if (i > 0)
+                        sb.append(", ");
+                    sb.append(partitionKeyBindIndexes[i]);
+                }
+            }
+            sb.append("]");
+            return sb.toString();
+        }
+
+        private static class Codec implements CBCodec<PreparedMetadata>
+        {
+            public PreparedMetadata decode(ByteBuf body, int version)
+            {
+                // flags & column count
+                int iflags = body.readInt();
+                int columnCount = body.readInt();
+
+                EnumSet<Flag> flags = Flag.deserialize(iflags);
+
+                Short[] partitionKeyBindIndexes = null;
+                if (version >= Server.VERSION_4)
+                {
+                    int numPKNames = body.readInt();
+                    if (numPKNames > 0)
+                    {
+                        partitionKeyBindIndexes = new Short[numPKNames];
+                        for (int i = 0; i < numPKNames; i++)
+                            partitionKeyBindIndexes[i] = body.readShort();
+                    }
+                }
+
+                boolean globalTablesSpec = flags.contains(Flag.GLOBAL_TABLES_SPEC);
+
+                String globalKsName = null;
+                String globalCfName = null;
+                if (globalTablesSpec)
+                {
+                    globalKsName = CBUtil.readString(body);
+                    globalCfName = CBUtil.readString(body);
+                }
+
+                // metadata (names/types)
+                List<ColumnSpecification> names = new ArrayList<>(columnCount);
+                for (int i = 0; i < columnCount; i++)
+                {
+                    String ksName = globalTablesSpec ? globalKsName : CBUtil.readString(body);
+                    String cfName = globalTablesSpec ? globalCfName : CBUtil.readString(body);
+                    ColumnIdentifier colName = new ColumnIdentifier(CBUtil.readString(body), true);
+                    AbstractType type = DataType.toType(DataType.codec.decodeOne(body, version));
+                    names.add(new ColumnSpecification(ksName, cfName, colName, type));
+                }
+                return new PreparedMetadata(flags, names, partitionKeyBindIndexes);
+            }
+
+            public void encode(PreparedMetadata m, ByteBuf dest, int version)
+            {
+                boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
+                dest.writeInt(Flag.serialize(m.flags));
+                dest.writeInt(m.names.size());
+
+                if (version >= Server.VERSION_4)
+                {
+                    // there's no point in providing partition key bind indexes if the statements affect multiple tables
+                    if (m.partitionKeyBindIndexes == null || !globalTablesSpec)
+                    {
+                        dest.writeInt(0);
+                    }
+                    else
+                    {
+                        dest.writeInt(m.partitionKeyBindIndexes.length);
+                        for (Short bindIndex : m.partitionKeyBindIndexes)
+                            dest.writeShort(bindIndex);
+                    }
+                }
+
+                if (globalTablesSpec)
+                {
+                    CBUtil.writeString(m.names.get(0).ksName, dest);
+                    CBUtil.writeString(m.names.get(0).cfName, dest);
+                }
+
+                for (ColumnSpecification name : m.names)
+                {
+                    if (!globalTablesSpec)
+                    {
+                        CBUtil.writeString(name.ksName, dest);
+                        CBUtil.writeString(name.cfName, dest);
+                    }
+                    CBUtil.writeString(name.name.toString(), dest);
+                    DataType.codec.writeOne(DataType.fromType(name.type, version), dest, version);
+                }
+            }
+
+            public int encodedSize(PreparedMetadata m, int version)
+            {
+                boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
+                int size = 8;
+                if (globalTablesSpec)
+                {
+                    size += CBUtil.sizeOfString(m.names.get(0).ksName);
+                    size += CBUtil.sizeOfString(m.names.get(0).cfName);
+                }
+
+                if (m.partitionKeyBindIndexes != null && version >= 4)
+                    size += 4 + 2 * m.partitionKeyBindIndexes.length;
+
+                for (ColumnSpecification name : m.names)
+                {
+                    if (!globalTablesSpec)
+                    {
+                        size += CBUtil.sizeOfString(name.ksName);
+                        size += CBUtil.sizeOfString(name.cfName);
+                    }
+                    size += CBUtil.sizeOfString(name.name.toString());
+                    size += DataType.codec.oneSerializedSize(DataType.fromType(name.type, version), version);
+                }
+                return size;
+            }
+        }
+    }
+
     public static enum Flag
     {
         // The order of that enum matters!!

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
index 0a55ced..5304350 100644
--- a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
+++ b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.cql3;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -25,11 +28,13 @@ public class VariableSpecifications
 {
     private final List<ColumnIdentifier> variableNames;
     private final ColumnSpecification[] specs;
+    private final ColumnDefinition[] targetColumns;
 
     public VariableSpecifications(List<ColumnIdentifier> variableNames)
     {
         this.variableNames = variableNames;
         this.specs = new ColumnSpecification[variableNames.size()];
+        this.targetColumns = new ColumnDefinition[variableNames.size()];
     }
 
     /**
@@ -51,12 +56,44 @@ public class VariableSpecifications
         return Arrays.asList(specs);
     }
 
+    /**
+     * Returns an array with the same length as the number of partition key columns for the table corresponding
+     * to cfm.  Each short in the array represents the bind index of the marker that holds the value for that
+     * partition key column.  If there are no bind markers for any of the partition key columns, null is returned.
+     *
+     * Callers of this method should ensure that all statements operate on the same table.
+     */
+    public Short[] getPartitionKeyBindIndexes(CFMetaData cfm)
+    {
+        Short[] partitionKeyPositions = new Short[cfm.partitionKeyColumns().size()];
+        for (int i = 0; i < targetColumns.length; i++)
+        {
+            ColumnDefinition targetColumn = targetColumns[i];
+            if (targetColumn != null && targetColumn.isPartitionKey())
+            {
+                assert targetColumn.ksName.equals(cfm.ksName) && targetColumn.cfName.equals(cfm.cfName);
+                partitionKeyPositions[targetColumn.position()] = (short) i;
+            }
+        }
+
+        for (Short bindIndex : partitionKeyPositions)
+        {
+            if (bindIndex == null)
+                return null;
+        }
+
+        return partitionKeyPositions;
+    }
+
     public void add(int bindIndex, ColumnSpecification spec)
     {
-        ColumnIdentifier name = variableNames.get(bindIndex);
+        if (spec instanceof ColumnDefinition)
+            targetColumns[bindIndex] = (ColumnDefinition) spec;
+
+        ColumnIdentifier bindMarkerName = variableNames.get(bindIndex);
         // Use the user name, if there is one
-        if (name != null)
-            spec = new ColumnSpecification(spec.ksName, spec.cfName, name, spec.type);
+        if (bindMarkerName != null)
+            spec = new ColumnSpecification(spec.ksName, spec.cfName, bindMarkerName, spec.type);
         specs[bindIndex] = spec;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index 5d3a125..eb796b0 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -54,7 +54,7 @@ public abstract class Selection
 
     private final CFMetaData cfm;
     private final Collection<ColumnDefinition> columns;
-    private final ResultSet.Metadata metadata;
+    private final ResultSet.ResultMetadata metadata;
     private final boolean collectTimestamps;
     private final boolean collectTTLs;
 
@@ -66,7 +66,7 @@ public abstract class Selection
     {
         this.cfm = cfm;
         this.columns = columns;
-        this.metadata = new ResultSet.Metadata(metadata);
+        this.metadata = new ResultSet.ResultMetadata(metadata);
         this.collectTimestamps = collectTimestamps;
         this.collectTTLs = collectTTLs;
     }
@@ -147,7 +147,7 @@ public abstract class Selection
            });
     }
 
-    public ResultSet.Metadata getResultMetadata()
+    public ResultSet.ResultMetadata getResultMetadata()
     {
         return metadata;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index a0aff3f..59f8f27 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -403,9 +403,25 @@ public class BatchStatement implements CQLStatement
         {
             VariableSpecifications boundNames = getBoundVariables();
 
+            String firstKS = null;
+            String firstCF = null;
+            boolean haveMultipleCFs = false;
+
             List<ModificationStatement> statements = new ArrayList<>(parsedStatements.size());
             for (ModificationStatement.Parsed parsed : parsedStatements)
+            {
+                if (firstKS == null)
+                {
+                    firstKS = parsed.keyspace();
+                    firstCF = parsed.columnFamily();
+                }
+                else if (!haveMultipleCFs)
+                {
+                    haveMultipleCFs = !firstKS.equals(parsed.keyspace()) || !firstCF.equals(parsed.columnFamily());
+                }
+
                 statements.add(parsed.prepare(boundNames));
+            }
 
             Attributes prepAttrs = attrs.prepare("[batch]", "[batch]");
             prepAttrs.collectMarkerSpecification(boundNames);
@@ -413,7 +429,12 @@ public class BatchStatement implements CQLStatement
             BatchStatement batchStatement = new BatchStatement(boundNames.size(), type, statements, prepAttrs);
             batchStatement.validate();
 
-            return new ParsedStatement.Prepared(batchStatement, boundNames);
+            // Use the CFMetadata of the first statement for partition key bind indexes.  If the statements affect
+            // multiple tables, we won't send partition key bind indexes.
+            Short[] partitionKeyBindIndexes = haveMultipleCFs ? null
+                                                              : boundNames.getPartitionKeyBindIndexes(batchStatement.statements.get(0).cfm);
+
+            return new ParsedStatement.Prepared(batchStatement, boundNames, partitionKeyBindIndexes);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 683ed49..8945d1d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -538,7 +538,7 @@ public abstract class ModificationStatement implements CQLStatement
         boolean success = cf == null;
 
         ColumnSpecification spec = new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, BooleanType.instance);
-        ResultSet.Metadata metadata = new ResultSet.Metadata(Collections.singletonList(spec));
+        ResultSet.ResultMetadata metadata = new ResultSet.ResultMetadata(Collections.singletonList(spec));
         List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success)));
 
         ResultSet rs = new ResultSet(metadata, rows);
@@ -565,7 +565,7 @@ public abstract class ModificationStatement implements CQLStatement
             row.addAll(right.rows.get(i));
             rows.add(row);
         }
-        return new ResultSet(new ResultSet.Metadata(specs), rows);
+        return new ResultSet(new ResultSet.ResultMetadata(specs), rows);
     }
 
     private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options)
@@ -690,7 +690,8 @@ public abstract class ModificationStatement implements CQLStatement
         {
             VariableSpecifications boundNames = getBoundVariables();
             ModificationStatement statement = prepare(boundNames);
-            return new ParsedStatement.Prepared(statement, boundNames);
+            CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
+            return new ParsedStatement.Prepared(statement, boundNames, boundNames.getPartitionKeyBindIndexes(cfm));
         }
 
         public ModificationStatement prepare(VariableSpecifications boundNames) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
index bcce9ce..c3e0639 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
@@ -44,21 +44,23 @@ public abstract class ParsedStatement
     {
         public final CQLStatement statement;
         public final List<ColumnSpecification> boundNames;
+        public final Short[] partitionKeyBindIndexes;
 
-        public Prepared(CQLStatement statement, List<ColumnSpecification> boundNames)
+        protected Prepared(CQLStatement statement, List<ColumnSpecification> boundNames, Short[] partitionKeyBindIndexes)
         {
             this.statement = statement;
             this.boundNames = boundNames;
+            this.partitionKeyBindIndexes = partitionKeyBindIndexes;
         }
 
-        public Prepared(CQLStatement statement, VariableSpecifications names)
+        public Prepared(CQLStatement statement, VariableSpecifications names, Short[] partitionKeyBindIndexes)
         {
-            this(statement, names.getSpecifications());
+            this(statement, names.getSpecifications(), partitionKeyBindIndexes);
         }
 
         public Prepared(CQLStatement statement)
         {
-            this(statement, Collections.<ColumnSpecification>emptyList());
+            this(statement, Collections.<ColumnSpecification>emptyList(), null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7094b6c..c73360c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -133,7 +133,7 @@ public class SelectStatement implements CQLStatement
                                    null);
     }
 
-    public ResultSet.Metadata getResultMetadata()
+    public ResultSet.ResultMetadata getResultMetadata()
     {
         return selection.getResultMetadata();
     }
@@ -766,7 +766,7 @@ public class SelectStatement implements CQLStatement
                                                         orderingComparator,
                                                         prepareLimit(boundNames));
 
-            return new ParsedStatement.Prepared(stmt, boundNames);
+            return new ParsedStatement.Prepared(stmt, boundNames, boundNames.getPartitionKeyBindIndexes(cfm));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 2868ed4..7591c83 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -181,7 +181,8 @@ public class Frame
             int version = firstByte & 0x7F;
 
             if (version > Server.CURRENT_VERSION)
-                throw new ProtocolException("Invalid or unsupported protocol version: " + version);
+                throw new ProtocolException(String.format("Invalid or unsupported protocol version (%d); highest supported is %d ",
+                                                          version, Server.CURRENT_VERSION));
 
             // Wait until we have the complete V3+ header
             if (version >= Server.VERSION_3 && buffer.readableBytes() < Header.MODERN_LENGTH)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 9133a5a..6aa929b 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -67,7 +67,7 @@ public class Server implements CassandraDaemon.Server
     public static final int VERSION_2 = 2;
     public static final int VERSION_3 = 3;
     public static final int VERSION_4 = 4;
-    public static final int CURRENT_VERSION = VERSION_3;
+    public static final int CURRENT_VERSION = VERSION_4;
 
     private final ConnectionTracker connectionTracker = new ConnectionTracker();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index f9d3a13..b76243f 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -238,11 +238,11 @@ public abstract class ResultMessage extends Message.Response
             public ResultMessage decode(ByteBuf body, int version)
             {
                 MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body));
-                ResultSet.Metadata metadata = ResultSet.Metadata.codec.decode(body, version);
+                ResultSet.PreparedMetadata metadata = ResultSet.PreparedMetadata.codec.decode(body, version);
 
-                ResultSet.Metadata resultMetadata = ResultSet.Metadata.EMPTY;
+                ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.EMPTY;
                 if (version > 1)
-                    resultMetadata = ResultSet.Metadata.codec.decode(body, version);
+                    resultMetadata = ResultSet.ResultMetadata.codec.decode(body, version);
 
                 return new Prepared(id, -1, metadata, resultMetadata);
             }
@@ -254,9 +254,9 @@ public abstract class ResultMessage extends Message.Response
                 assert prepared.statementId != null;
 
                 CBUtil.writeBytes(prepared.statementId.bytes, dest);
-                ResultSet.Metadata.codec.encode(prepared.metadata, dest, version);
+                ResultSet.PreparedMetadata.codec.encode(prepared.metadata, dest, version);
                 if (version > 1)
-                    ResultSet.Metadata.codec.encode(prepared.resultMetadata, dest, version);
+                    ResultSet.ResultMetadata.codec.encode(prepared.resultMetadata, dest, version);
             }
 
             public int encodedSize(ResultMessage msg, int version)
@@ -267,9 +267,9 @@ public abstract class ResultMessage extends Message.Response
 
                 int size = 0;
                 size += CBUtil.sizeOfBytes(prepared.statementId.bytes);
-                size += ResultSet.Metadata.codec.encodedSize(prepared.metadata, version);
+                size += ResultSet.PreparedMetadata.codec.encodedSize(prepared.metadata, version);
                 if (version > 1)
-                    size += ResultSet.Metadata.codec.encodedSize(prepared.resultMetadata, version);
+                    size += ResultSet.ResultMetadata.codec.encodedSize(prepared.resultMetadata, version);
                 return size;
             }
         };
@@ -277,25 +277,25 @@ public abstract class ResultMessage extends Message.Response
         public final MD5Digest statementId;
 
         /** Describes the variables to be bound in the prepared statement */
-        public final ResultSet.Metadata metadata;
+        public final ResultSet.PreparedMetadata metadata;
 
         /** Describes the results of executing this prepared statement */
-        public final ResultSet.Metadata resultMetadata;
+        public final ResultSet.ResultMetadata resultMetadata;
 
         // statement id for CQL-over-thrift compatibility. The binary protocol ignore that.
         private final int thriftStatementId;
 
         public Prepared(MD5Digest statementId, ParsedStatement.Prepared prepared)
         {
-            this(statementId, -1, new ResultSet.Metadata(prepared.boundNames), extractResultMetadata(prepared.statement));
+            this(statementId, -1, new ResultSet.PreparedMetadata(prepared.boundNames, prepared.partitionKeyBindIndexes), extractResultMetadata(prepared.statement));
         }
 
         public static Prepared forThrift(int statementId, List<ColumnSpecification> names)
         {
-            return new Prepared(null, statementId, new ResultSet.Metadata(names), ResultSet.Metadata.EMPTY);
+            return new Prepared(null, statementId, new ResultSet.PreparedMetadata(names, null), ResultSet.ResultMetadata.EMPTY);
         }
 
-        private Prepared(MD5Digest statementId, int thriftStatementId, ResultSet.Metadata metadata, ResultSet.Metadata resultMetadata)
+        private Prepared(MD5Digest statementId, int thriftStatementId, ResultSet.PreparedMetadata metadata, ResultSet.ResultMetadata resultMetadata)
         {
             super(Kind.PREPARED);
             this.statementId = statementId;
@@ -304,10 +304,10 @@ public abstract class ResultMessage extends Message.Response
             this.resultMetadata = resultMetadata;
         }
 
-        private static ResultSet.Metadata extractResultMetadata(CQLStatement statement)
+        private static ResultSet.ResultMetadata extractResultMetadata(CQLStatement statement)
         {
             if (!(statement instanceof SelectStatement))
-                return ResultSet.Metadata.EMPTY;
+                return ResultSet.ResultMetadata.EMPTY;
 
             return ((SelectStatement)statement).getResultMetadata();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dca37a61/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index 39bd58b..352327e 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertNotSame;
 
 /**
  * Serialization/deserialization tests for protocol objects and messages.
@@ -227,4 +228,34 @@ public class SerDeserTest
         m.put("foo", 24L);
         assertEquals(m, mt.getSerializer().deserializeForNativeProtocol(fields[3], 3));
     }
+
+    @Test
+    public void preparedMetadataSerializationTest()
+    {
+        List<ColumnSpecification> columnNames = new ArrayList<>();
+        for (int i = 0; i < 3; i++)
+            columnNames.add(new ColumnSpecification("ks", "cf", new ColumnIdentifier("col" + i, false), Int32Type.instance));
+
+        ResultSet.PreparedMetadata meta = new ResultSet.PreparedMetadata(columnNames, new Short[]{2, 1});
+        ByteBuf buf = Unpooled.buffer(meta.codec.encodedSize(meta, Server.VERSION_4));
+        meta.codec.encode(meta, buf, Server.VERSION_4);
+        ResultSet.PreparedMetadata decodedMeta = meta.codec.decode(buf, Server.VERSION_4);
+
+        assertEquals(meta, decodedMeta);
+
+        // v3 encoding doesn't include partition key bind indexes
+        buf = Unpooled.buffer(meta.codec.encodedSize(meta, Server.VERSION_3));
+        meta.codec.encode(meta, buf, Server.VERSION_3);
+        decodedMeta = meta.codec.decode(buf, Server.VERSION_3);
+
+        assertNotSame(meta, decodedMeta);
+
+        // however, if there are no partition key indexes, they should be the same
+        ResultSet.PreparedMetadata metaWithoutIndexes = new ResultSet.PreparedMetadata(columnNames, null);
+        buf = Unpooled.buffer(metaWithoutIndexes.codec.encodedSize(metaWithoutIndexes, Server.VERSION_4));
+        metaWithoutIndexes.codec.encode(metaWithoutIndexes, buf, Server.VERSION_4);
+        ResultSet.PreparedMetadata decodedMetaWithoutIndexes = metaWithoutIndexes.codec.decode(buf, Server.VERSION_4);
+
+        assertEquals(decodedMeta, decodedMetaWithoutIndexes);
+    }
 }