You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/30 20:23:02 UTC
[1/5] Native protocol v3
Repository: cassandra
Updated Branches:
refs/heads/trunk ad3424707 -> 6b4d98035
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index ef30a22..ec96ed1 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
@@ -61,8 +62,11 @@ public class BatchMessage extends Message.Request
throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind);
variables.add(CBUtil.readValueList(body));
}
- ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
- return new BatchMessage(toType(type), queryOrIds, variables, consistency);
+ QueryOptions options = version < 3
+ ? QueryOptions.fromPreV3Batch(CBUtil.readConsistencyLevel(body))
+ : QueryOptions.codec.decode(body, version);
+
+ return new BatchMessage(toType(type), queryOrIds, variables, options);
}
public void encode(BatchMessage msg, ByteBuf dest, int version)
@@ -84,7 +88,10 @@ public class BatchMessage extends Message.Request
CBUtil.writeValueList(msg.values.get(i), dest);
}
- CBUtil.writeConsistencyLevel(msg.consistency, dest);
+ if (version < 3)
+ CBUtil.writeConsistencyLevel(msg.options.getConsistency(), dest);
+ else
+ QueryOptions.codec.encode(msg.options, dest, version);
}
public int encodedSize(BatchMessage msg, int version)
@@ -99,7 +106,9 @@ public class BatchMessage extends Message.Request
size += CBUtil.sizeOfValueList(msg.values.get(i));
}
- size += CBUtil.sizeOfConsistencyLevel(msg.consistency);
+ size += version < 3
+ ? CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency())
+ : QueryOptions.codec.encodedSize(msg.options, version);
return size;
}
@@ -131,15 +140,15 @@ public class BatchMessage extends Message.Request
public final BatchStatement.Type type;
public final List<Object> queryOrIdList;
public final List<List<ByteBuffer>> values;
- public final ConsistencyLevel consistency;
+ public final QueryOptions options;
- public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, List<List<ByteBuffer>> values, ConsistencyLevel consistency)
+ public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, List<List<ByteBuffer>> values, QueryOptions options)
{
super(Message.Type.BATCH);
this.type = type;
this.queryOrIdList = queryOrIdList;
this.values = values;
- this.consistency = consistency;
+ this.options = options;
}
public Message.Response execute(QueryState state)
@@ -161,27 +170,39 @@ public class BatchMessage extends Message.Request
}
QueryHandler handler = state.getClientState().getCQLQueryHandler();
- List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size());
+ List<ParsedStatement.Prepared> prepared = new ArrayList<>(queryOrIdList.size());
for (int i = 0; i < queryOrIdList.size(); i++)
{
Object query = queryOrIdList.get(i);
- CQLStatement statement;
+ ParsedStatement.Prepared p;
if (query instanceof String)
{
- statement = QueryProcessor.parseStatement((String)query, state);
+ p = QueryProcessor.parseStatement((String)query, state);
}
else
{
- statement = handler.getPrepared((MD5Digest)query);
- if (statement == null)
+ p = handler.getPrepared((MD5Digest)query);
+ if (p == null)
throw new PreparedQueryNotFoundException((MD5Digest)query);
}
List<ByteBuffer> queryValues = values.get(i);
- if (queryValues.size() != statement.getBoundTerms())
+ if (queryValues.size() != p.statement.getBoundTerms())
throw new InvalidRequestException(String.format("There were %d markers(?) in CQL but %d bound variables",
- statement.getBoundTerms(),
+ p.statement.getBoundTerms(),
queryValues.size()));
+
+ prepared.add(p);
+ }
+
+ BatchQueryOptions batchOptions = BatchQueryOptions.withPerStatementVariables(options, values, queryOrIdList);
+ List<ModificationStatement> statements = new ArrayList<>(prepared.size());
+ for (int i = 0; i < prepared.size(); i++)
+ {
+ ParsedStatement.Prepared p = prepared.get(i);
+ batchOptions.forStatement(i).prepare(p.boundNames);
+ CQLStatement statement = p.statement;
+
if (!(statement instanceof ModificationStatement))
throw new InvalidRequestException("Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed.");
@@ -202,7 +223,7 @@ public class BatchMessage extends Message.Request
// Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
// (and no value would be really correct, so we prefer passing a clearly wrong one).
BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none());
- Message.Response response = handler.processBatch(batch, state, new BatchQueryOptions(consistency, values, queryOrIdList));
+ Message.Response response = handler.processBatch(batch, state, batchOptions);
if (tracingId != null)
response.setTracingId(tracingId);
@@ -229,7 +250,7 @@ public class BatchMessage extends Message.Request
if (i > 0) sb.append(", ");
sb.append(queryOrIdList.get(i)).append(" with ").append(values.get(i).size()).append(" values");
}
- sb.append("] at consistency ").append(consistency);
+ sb.append("] at consistency ").append(options.getConsistency());
return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index 890b9d1..f3ab526 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -28,17 +28,17 @@ public class EventMessage extends Message.Response
{
public EventMessage decode(ByteBuf body, int version)
{
- return new EventMessage(Event.deserialize(body));
+ return new EventMessage(Event.deserialize(body, version));
}
public void encode(EventMessage msg, ByteBuf dest, int version)
{
- msg.event.serialize(dest);
+ msg.event.serialize(dest, version);
}
public int encodedSize(EventMessage msg, int version)
{
- return msg.event.serializedSize();
+ return msg.event.serializedSize(version);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index caec43f..d618f43 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -27,6 +27,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.service.QueryState;
@@ -100,7 +101,9 @@ public class ExecuteMessage extends Message.Request
try
{
QueryHandler handler = state.getClientState().getCQLQueryHandler();
- CQLStatement statement = handler.getPrepared(statementId);
+ ParsedStatement.Prepared prepared = handler.getPrepared(statementId);
+ options.prepare(prepared.boundNames);
+ CQLStatement statement = prepared.statement;
if (statement == null)
throw new PreparedQueryNotFoundException(statementId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
new file mode 100644
index 0000000..9b66efb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.ByteBuf;
+
+import org.junit.Test;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.serializers.CollectionSerializer;
+import org.apache.cassandra.transport.Event.TopologyChange;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.StatusChange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+/**
+ * Serialization/deserialization tests for protocol objects and messages.
+ */
+public class SerDeserTest
+{
+ @Test
+ public void collectionSerDeserTest() throws Exception
+ {
+ collectionSerDeserTest(2);
+ collectionSerDeserTest(3);
+ }
+
+ public void collectionSerDeserTest(int version) throws Exception
+ {
+ // Lists
+ ListType<?> lt = ListType.getInstance(Int32Type.instance);
+ List<Integer> l = Arrays.asList(2, 6, 1, 9);
+
+ List<ByteBuffer> lb = new ArrayList<>(l.size());
+ for (Integer i : l)
+ lb.add(Int32Type.instance.decompose(i));
+
+ assertEquals(l, lt.getSerializer().deserializeForNativeProtocol(CollectionSerializer.pack(lb, lb.size(), version), version));
+
+ // Sets
+ SetType<?> st = SetType.getInstance(UTF8Type.instance);
+ Set<String> s = new LinkedHashSet<>();
+ s.addAll(Arrays.asList("bar", "foo", "zee"));
+
+ List<ByteBuffer> sb = new ArrayList<>(s.size());
+ for (String t : s)
+ sb.add(UTF8Type.instance.decompose(t));
+
+ assertEquals(s, st.getSerializer().deserializeForNativeProtocol(CollectionSerializer.pack(sb, sb.size(), version), version));
+
+ // Maps
+ MapType<?, ?> mt = MapType.getInstance(UTF8Type.instance, LongType.instance);
+ Map<String, Long> m = new LinkedHashMap<>();
+ m.put("bar", 12L);
+ m.put("foo", 42L);
+ m.put("zee", 14L);
+
+ List<ByteBuffer> mb = new ArrayList<>(m.size() * 2);
+ for (Map.Entry<String, Long> entry : m.entrySet())
+ {
+ mb.add(UTF8Type.instance.decompose(entry.getKey()));
+ mb.add(LongType.instance.decompose(entry.getValue()));
+ }
+
+ assertEquals(m, mt.getSerializer().deserializeForNativeProtocol(CollectionSerializer.pack(mb, m.size(), version), version));
+ }
+
+ @Test
+ public void eventSerDeserTest() throws Exception
+ {
+ eventSerDeserTest(2);
+ eventSerDeserTest(3);
+ }
+
+ public void eventSerDeserTest(int version) throws Exception
+ {
+ List<Event> events = new ArrayList<>();
+
+ events.add(TopologyChange.newNode(FBUtilities.getBroadcastAddress(), 42));
+ events.add(TopologyChange.removedNode(FBUtilities.getBroadcastAddress(), 42));
+ events.add(TopologyChange.movedNode(FBUtilities.getBroadcastAddress(), 42));
+
+ events.add(StatusChange.nodeUp(FBUtilities.getBroadcastAddress(), 42));
+ events.add(StatusChange.nodeDown(FBUtilities.getBroadcastAddress(), 42));
+
+ events.add(new SchemaChange(SchemaChange.Change.CREATED, "ks"));
+ events.add(new SchemaChange(SchemaChange.Change.UPDATED, "ks"));
+ events.add(new SchemaChange(SchemaChange.Change.DROPPED, "ks"));
+
+ events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.TABLE, "ks", "table"));
+ events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.TABLE, "ks", "table"));
+ events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.TABLE, "ks", "table"));
+
+ if (version >= 3)
+ {
+ events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.TYPE, "ks", "type"));
+ events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.TYPE, "ks", "type"));
+ events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.TYPE, "ks", "type"));
+ }
+
+ for (Event ev : events)
+ {
+ ByteBuf buf = Unpooled.buffer(ev.serializedSize(version));
+ ev.serialize(buf, version);
+ assertEquals(ev, Event.deserialize(buf, version));
+ }
+ }
+
+ private static ByteBuffer bb(String str)
+ {
+ return UTF8Type.instance.decompose(str);
+ }
+
+ private static ColumnIdentifier ci(String name)
+ {
+ return new ColumnIdentifier(name, false);
+ }
+
+ private static Constants.Literal lit(long v)
+ {
+ return Constants.Literal.integer(String.valueOf(v));
+ }
+
+ private static Constants.Literal lit(String v)
+ {
+ return Constants.Literal.string(v);
+ }
+
+ private static ColumnSpecification columnSpec(String name, AbstractType<?> type)
+ {
+ return new ColumnSpecification("ks", "cf", ci(name), type);
+ }
+
+ @Test
+ public void udtSerDeserTest() throws Exception
+ {
+ udtSerDeserTest(2);
+ udtSerDeserTest(3);
+ }
+
+ public void udtSerDeserTest(int version) throws Exception
+ {
+ ListType<?> lt = ListType.getInstance(Int32Type.instance);
+ SetType<?> st = SetType.getInstance(UTF8Type.instance);
+ MapType<?, ?> mt = MapType.getInstance(UTF8Type.instance, LongType.instance);
+
+ UserType udt = new UserType("ks",
+ bb("myType"),
+ Arrays.asList(bb("f1"), bb("f2"), bb("f3"), bb("f4")),
+ Arrays.asList(LongType.instance, lt, st, mt));
+
+ Map<ColumnIdentifier, Term.Raw> value = new HashMap<>();
+ value.put(ci("f1"), lit(42));
+ value.put(ci("f2"), new Lists.Literal(Arrays.<Term.Raw>asList(lit(3), lit(1))));
+ value.put(ci("f3"), new Sets.Literal(Arrays.<Term.Raw>asList(lit("foo"), lit("bar"))));
+ value.put(ci("f4"), new Maps.Literal(Arrays.<Pair<Term.Raw, Term.Raw>>asList(
+ Pair.<Term.Raw, Term.Raw>create(lit("foo"), lit(24)),
+ Pair.<Term.Raw, Term.Raw>create(lit("bar"), lit(12)))));
+
+ UserTypes.Literal u = new UserTypes.Literal(value);
+ Term t = u.prepare("ks", columnSpec("myValue", udt));
+
+ QueryOptions options = QueryOptions.DEFAULT;
+ if (version == 2)
+ options = QueryOptions.fromProtocolV2(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList());
+ else if (version != 3)
+ throw new AssertionError("Invalid protocol version for test");
+
+ ByteBuffer serialized = t.bindAndGet(options);
+
+ ByteBuffer[] fields = udt.split(serialized);
+
+ assertEquals(4, fields.length);
+
+ assertEquals(bytes(42L), fields[0]);
+
+ // Note that no matter what the protocol version has been used in bindAndGet above, the collections inside
+ // a UDT should alway be serialized with version 3 of the protocol. Which is why we don't use 'version'
+ // on purpose below.
+
+ assertEquals(Arrays.asList(3, 1), lt.getSerializer().deserializeForNativeProtocol(fields[1], 3));
+
+ LinkedHashSet<String> s = new LinkedHashSet<>();
+ s.addAll(Arrays.asList("bar", "foo"));
+ assertEquals(s, st.getSerializer().deserializeForNativeProtocol(fields[2], 3));
+
+ LinkedHashMap<String, Long> m = new LinkedHashMap<>();
+ m.put("bar", 12L);
+ m.put("foo", 24L);
+ assertEquals(m, mt.getSerializer().deserializeForNativeProtocol(fields[3], 3));
+ }
+}
[4/5] git commit: Native protocol v3
Posted by sl...@apache.org.
Native protocol v3
patch by slebresne; reviewed by thobbs for CASSANDRA-6855
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9872b74e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9872b74e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9872b74e
Branch: refs/heads/trunk
Commit: 9872b74ef20018e4e7645a8952fd7295e75764ad
Parents: ece3864
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Mar 12 18:58:55 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Apr 30 20:21:35 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 2 +-
doc/native_protocol_v3.spec | 911 +++++++++++++++++++
src/java/org/apache/cassandra/auth/Auth.java | 16 +-
.../cassandra/auth/CassandraAuthorizer.java | 6 +-
.../cassandra/auth/PasswordAuthenticator.java | 4 +-
.../org/apache/cassandra/cql3/Attributes.java | 8 +-
.../cassandra/cql3/BatchQueryOptions.java | 81 +-
.../apache/cassandra/cql3/ColumnCondition.java | 32 +-
.../org/apache/cassandra/cql3/Constants.java | 20 +-
src/java/org/apache/cassandra/cql3/Lists.java | 34 +-
src/java/org/apache/cassandra/cql3/Maps.java | 32 +-
.../org/apache/cassandra/cql3/QueryHandler.java | 3 +-
.../org/apache/cassandra/cql3/QueryOptions.java | 283 ++++--
.../apache/cassandra/cql3/QueryProcessor.java | 29 +-
.../org/apache/cassandra/cql3/ResultSet.java | 6 +-
src/java/org/apache/cassandra/cql3/Sets.java | 26 +-
src/java/org/apache/cassandra/cql3/Term.java | 18 +-
.../apache/cassandra/cql3/UpdateParameters.java | 6 +-
.../org/apache/cassandra/cql3/UserTypes.java | 18 +-
.../cassandra/cql3/functions/FunctionCall.java | 20 +-
.../cql3/statements/BatchStatement.java | 95 +-
.../cql3/statements/CQL3CasConditions.java | 14 +-
.../cql3/statements/ModificationStatement.java | 63 +-
.../cassandra/cql3/statements/Restriction.java | 28 +-
.../cql3/statements/SelectStatement.java | 150 +--
.../org/apache/cassandra/db/DefsTables.java | 19 +-
.../cassandra/db/marshal/CollectionType.java | 29 +-
.../apache/cassandra/db/marshal/ListType.java | 12 +-
.../apache/cassandra/db/marshal/MapType.java | 21 +-
.../apache/cassandra/db/marshal/SetType.java | 15 +-
.../apache/cassandra/db/marshal/UserType.java | 5 +
.../hadoop/pig/AbstractCassandraStorage.java | 11 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 11 +-
.../serializers/CollectionSerializer.java | 106 ++-
.../cassandra/serializers/ListSerializer.java | 39 +-
.../cassandra/serializers/MapSerializer.java | 48 +-
.../cassandra/serializers/SetSerializer.java | 39 +-
.../cassandra/service/IMigrationListener.java | 3 +
.../cassandra/service/MigrationManager.java | 18 +
.../cassandra/thrift/CassandraServer.java | 4 +-
.../org/apache/cassandra/transport/CBUtil.java | 17 +
.../org/apache/cassandra/transport/Client.java | 4 +-
.../apache/cassandra/transport/DataType.java | 79 +-
.../org/apache/cassandra/transport/Event.java | 158 +++-
.../apache/cassandra/transport/OptionCodec.java | 28 +-
.../org/apache/cassandra/transport/Server.java | 21 +-
.../cassandra/transport/SimpleClient.java | 4 +-
.../transport/messages/BatchMessage.java | 53 +-
.../transport/messages/EventMessage.java | 6 +-
.../transport/messages/ExecuteMessage.java | 5 +-
.../cassandra/transport/SerDeserTest.java | 217 +++++
52 files changed, 2232 insertions(+), 646 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64e5afb..a4811f6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -54,6 +54,7 @@
* Preemptive opening of compaction result (CASSANDRA-6916)
* Multi-threaded scrub/cleanup/upgradesstables (CASSANDRA-5547)
* Optimize cellname comparison (CASSANDRA-6934)
+ * Native protocol v3 (CASSANDRA-6855)
Merged from 2.0:
* Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072)
* Set JMX RMI port to 7199 (CASSANDRA-7087)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index d2b892e..ba29b37 100644
--- a/build.xml
+++ b/build.xml
@@ -387,7 +387,7 @@
<dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.airlift" artifactId="airline" version="0.6" />
- <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+ <dependency groupId="io.netty" artifactId="netty" version="4.0.17.Final" />
<dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
<dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" />
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/doc/native_protocol_v3.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v3.spec b/doc/native_protocol_v3.spec
new file mode 100644
index 0000000..6662f1c
--- /dev/null
+++ b/doc/native_protocol_v3.spec
@@ -0,0 +1,911 @@
+
+ CQL BINARY PROTOCOL v3
+
+
+Table of Contents
+
+ 1. Overview
+ 2. Frame header
+ 2.1. version
+ 2.2. flags
+ 2.3. stream
+ 2.4. opcode
+ 2.5. length
+ 3. Notations
+ 4. Messages
+ 4.1. Requests
+ 4.1.1. STARTUP
+ 4.1.2. AUTH_RESPONSE
+ 4.1.3. OPTIONS
+ 4.1.4. QUERY
+ 4.1.5. PREPARE
+ 4.1.6. EXECUTE
+ 4.1.7. BATCH
+ 4.1.8. REGISTER
+ 4.2. Responses
+ 4.2.1. ERROR
+ 4.2.2. READY
+ 4.2.3. AUTHENTICATE
+ 4.2.4. SUPPORTED
+ 4.2.5. RESULT
+ 4.2.5.1. Void
+ 4.2.5.2. Rows
+ 4.2.5.3. Set_keyspace
+ 4.2.5.4. Prepared
+ 4.2.5.5. Schema_change
+ 4.2.6. EVENT
+ 4.2.7. AUTH_CHALLENGE
+ 4.2.8. AUTH_SUCCESS
+ 5. Compression
+ 6. Collection types
+ 7. User Defined types
+ 8. Result paging
+ 9. Error codes
+ 10. Changes from v2
+
+
+1. Overview
+
+ The CQL binary protocol is a frame based protocol. Frames are defined as:
+
+ 0 8 16 24 32
+ +---------+---------+---------+---------+
+ | version | flags | stream | opcode |
+ +---------+---------+---------+---------+
+ | length |
+ +---------+---------+---------+---------+
+ | |
+ . ... body ... .
+ . .
+ . .
+ +----------------------------------------
+
+ The protocol is big-endian (network byte order).
+
+ Each frame contains a fixed size header (8 bytes) followed by a variable size
+ body. The header is described in Section 2. The content of the body depends
+ on the header opcode value (the body can in particular be empty for some
+ opcode values). The list of allowed opcode is defined Section 2.3 and the
+ details of each corresponding message is described Section 4.
+
+ The protocol distinguishes 2 types of frames: requests and responses. Requests
+ are those frame sent by the clients to the server, response are the ones sent
+ by the server. Note however that the protocol supports server pushes (events)
+ so responses does not necessarily come right after a client request.
+
+ Note to client implementors: clients library should always assume that the
+ body of a given frame may contain more data than what is described in this
+ document. It will however always be safe to ignore the remaining of the frame
+ body in such cases. The reason is that this may allow to sometimes extend the
+ protocol with optional features without needing to change the protocol
+ version.
+
+
+
+2. Frame header
+
+2.1. version
+
+ The version is a single byte that indicate both the direction of the message
+ (request or response) and the version of the protocol in use. The up-most bit
+ of version is used to define the direction of the message: 0 indicates a
+ request, 1 indicates a responses. This can be useful for protocol analyzers to
+ distinguish the nature of the packet from the direction which it is moving.
+ The rest of that byte is the protocol version (3 for the protocol defined in
+ this document). In other words, for this version of the protocol, version will
+ have one of:
+ 0x03 Request frame for this protocol version
+ 0x83 Response frame for this protocol version
+
+ Please note that the while every message ship with the version, only one version
+ of messages is accepted on a given connection. In other words, the first message
+ exchanged (STARTUP) sets the version for the connection for the lifetime of this
+ connection.
+
+ This document describe the version 3 of the protocol. For the changes made since
+ version 2, see Section 10.
+
+
+2.2. flags
+
+ Flags applying to this frame. The flags have the following meaning (described
+ by the mask that allow to select them):
+ 0x01: Compression flag. If set, the frame body is compressed. The actual
+ compression to use should have been set up beforehand through the
+ Startup message (which thus cannot be compressed; Section 4.1.1).
+ 0x02: Tracing flag. For a request frame, this indicate the client requires
+ tracing of the request. Note that not all requests support tracing.
+ Currently, only QUERY, PREPARE and EXECUTE queries support tracing.
+ Other requests will simply ignore the tracing flag if set. If a
+ request support tracing and the tracing flag was set, the response to
+ this request will have the tracing flag set and contain tracing
+ information.
+ If a response frame has the tracing flag set, its body contains
+ a tracing ID. The tracing ID is a [uuid] and is the first thing in
+ the frame body. The rest of the body will then be the usual body
+ corresponding to the response opcode.
+
+ The rest of the flags is currently unused and ignored.
+
+2.3. stream
+
+ A frame has a stream id (one signed byte). When sending request messages, this
+ stream id must be set by the client to a positive byte (negative stream id
+ are reserved for streams initiated by the server; currently all EVENT messages
+ (section 4.2.6) have a streamId of -1). If a client sends a request message
+ with the stream id X, it is guaranteed that the stream id of the response to
+ that message will be X.
+
+ This allow to deal with the asynchronous nature of the protocol. If a client
+ sends multiple messages simultaneously (without waiting for responses), there
+ is no guarantee on the order of the responses. For instance, if the client
+ writes REQ_1, REQ_2, REQ_3 on the wire (in that order), the server might
+ respond to REQ_3 (or REQ_2) first. Assigning different stream id to these 3
+ requests allows the client to distinguish to which request an received answer
+ respond to. As there can only be 128 different simultaneous stream, it is up
+ to the client to reuse stream id.
+
+ Note that clients are free to use the protocol synchronously (i.e. wait for
+ the response to REQ_N before sending REQ_N+1). In that case, the stream id
+ can be safely set to 0. Clients should also feel free to use only a subset of
+ the 128 maximum possible stream ids if it is simpler for those
+ implementation.
+
+2.4. opcode
+
+ An integer byte that distinguish the actual message:
+ 0x00 ERROR
+ 0x01 STARTUP
+ 0x02 READY
+ 0x03 AUTHENTICATE
+ 0x05 OPTIONS
+ 0x06 SUPPORTED
+ 0x07 QUERY
+ 0x08 RESULT
+ 0x09 PREPARE
+ 0x0A EXECUTE
+ 0x0B REGISTER
+ 0x0C EVENT
+ 0x0D BATCH
+ 0x0E AUTH_CHALLENGE
+ 0x0F AUTH_RESPONSE
+ 0x10 AUTH_SUCCESS
+
+ Messages are described in Section 4.
+
+ (Note that there is no 0x04 message in this version of the protocol)
+
+
+2.5. length
+
+ A 4 byte integer representing the length of the body of the frame (note:
+ currently a frame is limited to 256MB in length).
+
+
+3. Notations
+
+ To describe the layout of the frame body for the messages in Section 4, we
+ define the following:
+
+ [int] A 4 bytes integer
+ [long] A 8 bytes integer
+ [short] A 2 bytes unsigned integer
+ [string] A [short] n, followed by n bytes representing an UTF-8
+ string.
+ [long string] An [int] n, followed by n bytes representing an UTF-8 string.
+ [uuid] A 16 bytes long uuid.
+ [string list] A [short] n, followed by n [string].
+ [bytes] A [int] n, followed by n bytes if n >= 0. If n < 0,
+ no byte should follow and the value represented is `null`.
+ [short bytes] A [short] n, followed by n bytes if n >= 0.
+
+ [option] A pair of <id><value> where <id> is a [short] representing
+ the option id and <value> depends on that option (and can be
+ of size 0). The supported id (and the corresponding <value>)
+ will be described when this is used.
+ [option list] A [short] n, followed by n [option].
+ [inet] An address (ip and port) to a node. It consists of one
+ [byte] n, that represents the address size, followed by n
+ [byte] representing the IP address (in practice n can only be
+ either 4 (IPv4) or 16 (IPv6)), following by one [int]
+ representing the port.
+ [consistency] A consistency level specification. This is a [short]
+ representing a consistency level with the following
+ correspondance:
+ 0x0000 ANY
+ 0x0001 ONE
+ 0x0002 TWO
+ 0x0003 THREE
+ 0x0004 QUORUM
+ 0x0005 ALL
+ 0x0006 LOCAL_QUORUM
+ 0x0007 EACH_QUORUM
+ 0x0008 SERIAL
+ 0x0009 LOCAL_SERIAL
+ 0x000A LOCAL_ONE
+
+ [string map] A [short] n, followed by n pair <k><v> where <k> and <v>
+ are [string].
+ [string multimap] A [short] n, followed by n pair <k><v> where <k> is a
+ [string] and <v> is a [string list].
+
+
+4. Messages
+
+4.1. Requests
+
+ Note that outside of their normal responses (described below), all requests
+ can get an ERROR message (Section 4.2.1) as response.
+
+4.1.1. STARTUP
+
+ Initialize the connection. The server will respond by either a READY message
+ (in which case the connection is ready for queries) or an AUTHENTICATE message
+ (in which case credentials will need to be provided using AUTH_RESPONSE).
+
+ This must be the first message of the connection, except for OPTIONS that can
+ be sent before to find out the options supported by the server. Once the
+ connection has been initialized, a client should not send any more STARTUP
+ message.
+
+ The body is a [string map] of options. Possible options are:
+ - "CQL_VERSION": the version of CQL to use. This option is mandatory and
+ currenty, the only version supported is "3.0.0". Note that this is
+ different from the protocol version.
+ - "COMPRESSION": the compression algorithm to use for frames (See section 5).
+ This is optional, if not specified no compression will be used.
+
+
+4.1.2. AUTH_RESPONSE
+
+ Answers a server authentication challenge.
+
+ Authentication in the protocol is SASL based. The server sends authentication
+ challenges (a bytes token) to which the client answer with this message. Those
+ exchanges continue until the server accepts the authentication by sending a
+ AUTH_SUCCESS message after a client AUTH_RESPONSE. It is however that client that
+ initiate the exchange by sending an initial AUTH_RESPONSE in response to a
+ server AUTHENTICATE request.
+
+ The body of this message is a single [bytes] token. The details of what this
+ token contains (and when it can be null/empty, if ever) depends on the actual
+ authenticator used.
+
+ The response to a AUTH_RESPONSE is either a follow-up AUTH_CHALLENGE message,
+ an AUTH_SUCCESS message or an ERROR message.
+
+
+4.1.3. OPTIONS
+
+ Asks the server to return what STARTUP options are supported. The body of an
+ OPTIONS message should be empty and the server will respond with a SUPPORTED
+ message.
+
+
+4.1.4. QUERY
+
+ Performs a CQL query. The body of the message must be:
+ <query><query_parameters>
+ where <query> is a [long string] representing the query and
+ <query_parameters> must be
+ <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
+ where:
+ - <consistency> is the [consistency] level for the operation.
+ - <flags> is a [byte] whose bits define the options for this query and
+ in particular influence what the remainder of the message contains.
+ A flag is set if the bit corresponding to its `mask` is set. Supported
+ flags are, given there mask:
+ 0x01: Values. In that case, a [short] <n> followed by <n> [bytes]
+ values are provided. Those value are used for bound variables in
+ the query. Optionally, if the 0x40 flag is present, each value
+ will be preceded by a [string] name, representing the name of
+ the marker the value must be binded to. This is optional, and
+ if not present, values will be binded by position.
+ 0x02: Skip_metadata. If present, the Result Set returned as a response
+ to that query (if any) will have the NO_METADATA flag (see
+ Section 4.2.5.2).
+ 0x04: Page_size. In that case, <result_page_size> is an [int]
+ controlling the desired page size of the result (in CQL3 rows).
+ See the section on paging (Section 7) for more details.
+ 0x08: With_paging_state. If present, <paging_state> should be present.
+ <paging_state> is a [bytes] value that should have been returned
+ in a result set (Section 4.2.5.2). If provided, the query will be
+ executed but starting from a given paging state. This also to
+ continue paging on a different node from the one it has been
+ started (See Section 7 for more details).
+ 0x10: With serial consistency. If present, <serial_consistency> should be
+ present. <serial_consistency> is the [consistency] level for the
+ serial phase of conditional updates. That consitency can only be
+ either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+ SERIAL. This option will be ignored for anything else that a
+ conditional update/insert.
+ 0x20: With default timestamp. If present, <timestamp> should be present.
+ <timestamp> is a [long] representing the default timestamp for the query
+ in microseconds (negative values are forbidden). If provided, this will
+ replace the server side assigned timestamp as default timestamp.
+ Note that a timestamp in the query itself will still override
+ this timestamp. This is entirely optional.
+ 0x40: With names for values. This only makes sense if the 0x01 flag is set and
+ is ignored otherwise. If present, the values from the 0x01 flag will
+ be preceded by a name (see above). Note that this is only useful for
+ QUERY requests where named bind markers are used; for EXECUTE statements,
+ since the names for the expected values was returned during preparation,
+ a client can always provide values in the right order without any names
+ and using this flag, while supported, is almost surely inefficient.
+
+ Note that the consistency is ignored by some queries (USE, CREATE, ALTER,
+ TRUNCATE, ...).
+
+ The server will respond to a QUERY message with a RESULT message, the content
+ of which depends on the query.
+
+
+4.1.5. PREPARE
+
+ Prepare a query for later execution (through EXECUTE). The body consists of
+ the CQL query to prepare as a [long string].
+
+ The server will respond with a RESULT message with a `prepared` kind (0x0004,
+ see Section 4.2.5).
+
+
+4.1.6. EXECUTE
+
+ Executes a prepared query. The body of the message must be:
+ <id><query_parameters>
+ where <id> is the prepared query ID. It's the [short bytes] returned as a
+ response to a PREPARE message. As for <query_parameters>, it has the exact
+ same definition than in QUERY (see Section 4.1.4).
+
+ The response from the server will be a RESULT message.
+
+
+4.1.7. BATCH
+
+ Allows executing a list of queries (prepared or not) as a batch (note that
+ only DML statements are accepted in a batch). The body of the message must
+ be:
+ <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>]
+ where:
+ - <type> is a [byte] indicating the type of batch to use:
+ - If <type> == 0, the batch will be "logged". This is equivalent to a
+ normal CQL3 batch statement.
+ - If <type> == 1, the batch will be "unlogged".
+ - If <type> == 2, the batch will be a "counter" batch (and non-counter
+ statements will be rejected).
+ - <flags> is a [byte] whose bits define the options for this query and
+ in particular influence the remainder of the message contains. It is similar
+ to the <flags> from QUERY and EXECUTE methods, except that the 4 rightmost
+ bits must always be 0 as their corresponding option do not make sense for
+ Batch. A flag is set if the bit corresponding to its `mask` is set. Supported
+ flags are, given there mask:
+ 0x10: With serial consistency. If present, <serial_consistency> should be
+ present. <serial_consistency> is the [consistency] level for the
+ serial phase of conditional updates. That consitency can only be
+ either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+ SERIAL. This option will be ignored for anything else that a
+ conditional update/insert.
+ 0x20: With default timestamp. If present, <timestamp> should be present.
+ <timestamp> is a [long] representing the default timestamp for the query
+ in microseconds. If provided, this will replace the server side assigned
+ timestamp as default timestamp. Note that a timestamp in the query itself
+ will still override this timestamp. This is entirely optional.
+ 0x40: With names for values. If set, then all values for all <query_i> must be
+ preceded by a [string] <name_i> that have the same meaning as in QUERY
+ requests.
+ - <n> is a [short] indicating the number of following queries.
+ - <query_1>...<query_n> are the queries to execute. A <query_i> must be of the
+ form:
+ <kind><string_or_id><n>[<name_1>]<value_1>...[<name_n>]<value_n>
+ where:
+ - <kind> is a [byte] indicating whether the following query is a prepared
+ one or not. <kind> value must be either 0 or 1.
+ - <string_or_id> depends on the value of <kind>. If <kind> == 0, it should be
+ a [long string] query string (as in QUERY, the query string might contain
+ bind markers). Otherwise (that is, if <kind> == 1), it should be a
+ [short bytes] representing a prepared query ID.
+ - <n> is a [short] indicating the number (possibly 0) of following values.
+ - <name_i> is the optional name of the following <value_i>. It must be present
+ if and only if the 0x40 flag is provided for the batch.
+ - <value_i> is the [bytes] to use for bound variable i (of bound variable <name_i>
+ if the 0x40 flag is used).
+ - <consistency> is the [consistency] level for the operation.
+ - <serial_consistency> is only present if the 0x10 flag is set. In that case,
+ <serial_consistency> is the [consistency] level for the serial phase of
+ conditional updates. That consitency can only be either SERIAL or
+ LOCAL_SERIAL and if not present will defaults to SERIAL. This option will
+ be ignored for anything else that a conditional update/insert.
+
+ The server will respond with a RESULT message.
+
+
+4.1.8. REGISTER
+
+ Register this connection to receive some type of events. The body of the
+ message is a [string list] representing the event types to register to. See
+ section 4.2.6 for the list of valid event types.
+
+ The response to a REGISTER message will be a READY message.
+
+ Please note that if a client driver maintains multiple connections to a
+ Cassandra node and/or connections to multiple nodes, it is advised to
+ dedicate a handful of connections to receive events, but to *not* register
+ for events on all connections, as this would only result in receiving
+ multiple times the same event messages, wasting bandwidth.
+
+
+4.2. Responses
+
+ This section describes the content of the frame body for the different
+ responses. Please note that to make room for future evolution, clients should
+ support extra informations (that they should simply discard) to the one
+ described in this document at the end of the frame body.
+
+4.2.1. ERROR
+
+ Indicates an error processing a request. The body of the message will be an
+ error code ([int]) followed by a [string] error message. Then, depending on
+ the exception, more content may follow. The error codes are defined in
+ Section 8, along with their additional content if any.
+
+
+4.2.2. READY
+
+ Indicates that the server is ready to process queries. This message will be
+ sent by the server either after a STARTUP message if no authentication is
+ required, or after a successful CREDENTIALS message.
+
+ The body of a READY message is empty.
+
+
+4.2.3. AUTHENTICATE
+
+ Indicates that the server require authentication, and which authentication
+ mechanism to use.
+
+ The authentication is SASL based and thus consists on a number of server
+ challenges (AUTH_CHALLENGE, Section 4.2.7) followed by client responses
+ (AUTH_RESPONSE, Section 4.1.2). The Initial exchange is however boostrapped
+ by an initial client response. The details of that exchange (including how
+ much challenge-response pair are required) are specific to the authenticator
+ in use. The exchange ends when the server sends an AUTH_SUCCESS message or
+ an ERROR message.
+
+ This message will be sent following a STARTUP message if authentication is
+ required and must be answered by a AUTH_RESPONSE message from the client.
+
+ The body consists of a single [string] indicating the full class name of the
+ IAuthenticator in use.
+
+
+4.2.4. SUPPORTED
+
+ Indicates which startup options are supported by the server. This message
+ comes as a response to an OPTIONS message.
+
+ The body of a SUPPORTED message is a [string multimap]. This multimap gives
+ for each of the supported STARTUP options, the list of supported values.
+
+
+4.2.5. RESULT
+
+ The result to a query (QUERY, PREPARE, EXECUTE or BATCH messages).
+
+ The first element of the body of a RESULT message is an [int] representing the
+ `kind` of result. The rest of the body depends on the kind. The kind can be
+ one of:
+ 0x0001 Void: for results carrying no information.
+ 0x0002 Rows: for results to select queries, returning a set of rows.
+ 0x0003 Set_keyspace: the result to a `use` query.
+ 0x0004 Prepared: result to a PREPARE message.
+ 0x0005 Schema_change: the result to a schema altering query.
+
+ The body for each kind (after the [int] kind) is defined below.
+
+
+4.2.5.1. Void
+
+ The rest of the body for a Void result is empty. It indicates that a query was
+ successful without providing more information.
+
+
+4.2.5.2. Rows
+
+ Indicates a set of rows. The rest of body of a Rows result is:
+ <metadata><rows_count><rows_content>
+ where:
+ - <metadata> is composed of:
+ <flags><columns_count>[<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
+ where:
+ - <flags> is an [int]. The bits of <flags> provides information on the
+ formatting of the remaining informations. A flag is set if the bit
+ corresponding to its `mask` is set. Supported flags are, given there
+ mask:
+ 0x0001 Global_tables_spec: if set, only one table spec (keyspace
+ and table name) is provided as <global_table_spec>. If not
+ set, <global_table_spec> is not present.
+ 0x0002 Has_more_pages: indicates whether this is not the last
+ page of results and more should be retrieve. If set, the
+ <paging_state> will be present. The <paging_state> is a
+ [bytes] value that should be used in QUERY/EXECUTE to
+ continue paging and retrieve the remained of the result for
+ this query (See Section 7 for more details).
+ 0x0004 No_metadata: if set, the <metadata> is only composed of
+ these <flags>, the <column_count> and optionally the
+ <paging_state> (depending on the Has_more_pages flage) but
+ no other information (so no <global_table_spec> nor <col_spec_i>).
+ This will only ever be the case if this was requested
+ during the query (see QUERY and RESULT messages).
+ - <columns_count> is an [int] representing the number of columns selected
+ by the query this result is of. It defines the number of <col_spec_i>
+ elements in and the number of element for each row in <rows_content>.
+ - <global_table_spec> is present if the Global_tables_spec is set in
+ <flags>. If present, it is composed of two [string] representing the
+ (unique) keyspace name and table name the columns return are of.
+ - <col_spec_i> specifies the columns returned in the query. There is
+ <column_count> such column specifications that are composed of:
+ (<ksname><tablename>)?<name><type>
+ The initial <ksname> and <tablename> are two [string] are only present
+ if the Global_tables_spec flag is not set. The <column_name> is a
+ [string] and <type> is an [option] that correspond to the description
+ (what this description is depends a bit on the context: in results to
+ selects, this will be either the user chosen alias or the selection used
+ (often a colum name, but it can be a function call too). In results to
+ a PREPARE, this will be either the name of the bind variable corresponding
+ or the column name for the variable if it is "anonymous") and type of
+ the corresponding result. The option for <type> is either a native
+ type (see below), in which case the option has no value, or a
+ 'custom' type, in which case the value is a [string] representing
+ the full qualified class name of the type represented. Valid option
+ ids are:
+ 0x0000 Custom: the value is a [string], see above.
+ 0x0001 Ascii
+ 0x0002 Bigint
+ 0x0003 Blob
+ 0x0004 Boolean
+ 0x0005 Counter
+ 0x0006 Decimal
+ 0x0007 Double
+ 0x0008 Float
+ 0x0009 Int
+ 0x000B Timestamp
+ 0x000C Uuid
+ 0x000D Varchar
+ 0x000E Varint
+ 0x000F Timeuuid
+ 0x0010 Inet
+ 0x0020 List: the value is an [option], representing the type
+ of the elements of the list.
+ 0x0021 Map: the value is two [option], representing the types of the
+ keys and values of the map
+ 0x0022 Set: the value is an [option], representing the type
+ of the elements of the set
+ 0x0030 UDT: the value is <ks><udt_name><n><name_1><type_1>...<name_n><type_n>
+ where:
+ - <ks> is a [string] representing the keyspace name this
+ UDT is part of.
+ - <udt_name> is a [string] representing the UDT name.
+ - <n> is a [short] reprensenting the number of fields of
+ the UDT, and thus the number of <name_i><type_i> pair
+ following
+ - <name_i> is a [string] representing the name of the
+ i_th field of the UDT.
+ - <type_i> is an [option] representing the type of the
+ i_th field of the UDT.
+
+ - <rows_count> is an [int] representing the number of rows present in this
+ result. Those rows are serialized in the <rows_content> part.
+ - <rows_content> is composed of <row_1>...<row_m> where m is <rows_count>.
+ Each <row_i> is composed of <value_1>...<value_n> where n is
+ <columns_count> and where <value_j> is a [bytes] representing the value
+ returned for the jth column of the ith row. In other words, <rows_content>
+ is composed of (<rows_count> * <columns_count>) [bytes].
+
+
+4.2.5.3. Set_keyspace
+
+ The result to a `use` query. The body (after the kind [int]) is a single
+ [string] indicating the name of the keyspace that has been set.
+
+
+4.2.5.4. Prepared
+
+ The result to a PREPARE message. The rest of the body of a Prepared result is:
+ <id><metadata><result_metadata>
+ where:
+ - <id> is [short bytes] representing the prepared query ID.
+ - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2; you
+ can however assume that the Has_more_pages flag is always off) and
+ is the specification for the variable bound in this prepare statement.
+ - <result_metadata> is defined exactly as <metadata> but correspond to the
+ metadata for the resultSet that execute this query will yield. Note that
+ <result_metadata> may be empty (have the No_metadata flag and 0 columns, See
+ section 4.2.5.2) and will be for any query that is not a Select. There is
+ in fact never a guarantee that this will non-empty so client should protect
+ themselves accordingly. The presence of this information is an
+ optimization that allows to later execute the statement that has been
+ prepared without requesting the metadata (Skip_metadata flag in EXECUTE).
+ Clients can safely discard this metadata if they do not want to take
+ advantage of that optimization.
+
+ Note that prepared query ID return is global to the node on which the query
+ has been prepared. It can be used on any connection to that node and this
+ until the node is restarted (after which the query must be reprepared).
+
+4.2.5.5. Schema_change
+
+ The result to a schema altering query (creation/update/drop of a
+ keyspace/table/index). The body (after the kind [int]) is composed of 3
+ [string]:
+ <change><keyspace><table>
+ where:
+ - <change> describe the type of change that has occured. It can be one of
+ "CREATED", "UPDATED" or "DROPPED".
+ - <keyspace> is the name of the affected keyspace or the keyspace of the
+ affected table.
+ - <table> is the name of the affected table. <table> will be empty (i.e.
+ the empty string "") if the change was affecting a keyspace and not a
+ table.
+
+ Note that queries to create and drop an index are considered as change
+ updating the table the index is on.
+
+
+4.2.6. EVENT
+
+ And event pushed by the server. A client will only receive events for the
+ type it has REGISTER to. The body of an EVENT message will start by a
+ [string] representing the event type. The rest of the message depends on the
+ event type. The valid event types are:
+ - "TOPOLOGY_CHANGE": events related to change in the cluster topology.
+ Currently, events are sent when new nodes are added to the cluster, and
+ when nodes are removed. The body of the message (after the event type)
+ consists of a [string] and an [inet], corresponding respectively to the
+ type of change ("NEW_NODE" or "REMOVED_NODE") followed by the address of
+ the new/removed node.
+ - "STATUS_CHANGE": events related to change of node status. Currently,
+ up/down events are sent. The body of the message (after the event type)
+ consists of a [string] and an [inet], corresponding respectively to the
+ type of status change ("UP" or "DOWN") followed by the address of the
+ concerned node.
+ - "SCHEMA_CHANGE": events related to schema change. After the event type,
+ the rest of the message will be <change_type><target><options> where:
+ - <change_type> is the type of changed involved. It will be one of
+ "CREATED", "UPDATED" or "DROPPED".
+ - <target> can be one of "KEYSPACE", "TABLE" or "TYPE" and describes
+ what has been modified ("TYPE" stands for modifications related to
+ user types).
+ - <options> depends on the preceding <target>. If <target> is
+ "KEYSPACE", then <options> will be a single [string] representing the
+ keyspace changed. Otherwise, if <target> is "TABLE" or "TYPE", then
+ <options> will be 2 [string]: the first one will be the keyspace
+ containing the affected object, and the second one will be the name
+ of said affected object (so either the table name or the user type
+ name).
+
+ All EVENT message have a streamId of -1 (Section 2.3).
+
+ Please note that "NEW_NODE" and "UP" events are sent based on internal Gossip
+ communication and as such may be sent a short delay before the binary
+ protocol server on the newly up node is fully started. Clients are thus
+ advise to wait a short time before trying to connect to the node (1 seconds
+ should be enough), otherwise they may experience a connection refusal at
+ first.
+
+4.2.7. AUTH_CHALLENGE
+
+ A server authentication challenge (see AUTH_RESPONSE (Section 4.1.2) for more
+ details).
+
+ The body of this message is a single [bytes] token. The details of what this
+ token contains (and when it can be null/empty, if ever) depends on the actual
+ authenticator used.
+
+ Clients are expected to answer the server challenge by an AUTH_RESPONSE
+ message.
+
+4.2.7. AUTH_SUCCESS
+
+ Indicate the success of the authentication phase. See Section 4.2.3 for more
+ details.
+
+ The body of this message is a single [bytes] token holding final information
+ from the server that the client may require to finish the authentication
+ process. What that token contains and whether it can be null depends on the
+ actual authenticator used.
+
+
+5. Compression
+
+ Frame compression is supported by the protocol, but then only the frame body
+ is compressed (the frame header should never be compressed).
+
+ Before being used, client and server must agree on a compression algorithm to
+ use, which is done in the STARTUP message. As a consequence, a STARTUP message
+ must never be compressed. However, once the STARTUP frame has been received
+ by the server can be compressed (including the response to the STARTUP
+ request). Frame do not have to be compressed however, even if compression has
+ been agreed upon (a server may only compress frame above a certain size at its
+ discretion). A frame body should be compressed if and only if the compressed
+ flag (see Section 2.2) is set.
+
+ As of this version 2 of the protocol, the following compressions are available:
+ - lz4 (https://code.google.com/p/lz4/). In that, note that the 4 first bytes
+ of the body will be the uncompressed length (followed by the compressed
+ bytes).
+ - snappy (https://code.google.com/p/snappy/). This compression might not be
+ available as it depends on a native lib (server-side) that might not be
+ avaivable on some installation.
+
+
+6. Collection types
+
+ This section describe the serialization format for the collection types:
+ list, map and set. This serialization format is both useful to decode values
+ returned in RESULT messages but also to encode values for EXECUTE ones.
+
+ The serialization formats are:
+ List: a [int] n indicating the size of the list, followed by n elements.
+ Each element is [bytes] representing the serialized element
+ value.
+ Map: a [int] n indicating the size of the map, followed by n entries.
+ Each entry is composed of two [bytes] representing the key and
+ the value of the entry map.
+ Set: a [int] n indicating the size of the set, followed by n elements.
+ Each element is [bytes] representing the serialized element
+ value.
+
+
+7. User defined types
+
+ This section describe the serialization format for User defined types (UDT) values.
+ UDT values are the values of the User Defined Types as defined in section 4.2.5.2.
+
+ A UDT value is a [short] n indicating the number of values (field) of UDT values
+ followed by n elements. Each element is a [short bytes] representing the serialized
+ field.
+
+
+8. Result paging
+
+ The protocol allows for paging the result of queries. For that, the QUERY and
+ EXECUTE messages have a <result_page_size> value that indicate the desired
+ page size in CQL3 rows.
+
+ If a positive value is provided for <result_page_size>, the result set of the
+ RESULT message returned for the query will contain at most the
+ <result_page_size> first rows of the query result. If that first page of result
+ contains the full result set for the query, the RESULT message (of kind `Rows`)
+ will have the Has_more_pages flag *not* set. However, if some results are not
+ part of the first response, the Has_more_pages flag will be set and the result
+ will contain a <paging_state> value. In that case, the <paging_state> value
+ should be used in a QUERY or EXECUTE message (that has the *same* query than
+ the original one or the behavior is undefined) to retrieve the next page of
+ results.
+
+ Only CQL3 queries that return a result set (RESULT message with a Rows `kind`)
+ support paging. For other type of queries, the <result_page_size> value is
+ ignored.
+
+ Note to client implementors:
+ - While <result_page_size> can be as low as 1, it will likely be detrimental
+ to performance to pick a value too low. A value below 100 is probably too
+ low for most use cases.
+ - Clients should not rely on the actual size of the result set returned to
+ decide if there is more result to fetch or not. Instead, they should always
+ check the Has_more_pages flag (unless they did not enabled paging for the query
+ obviously). Clients should also not assert that no result will have more than
+ <result_page_size> results. While the current implementation always respect
+ the exact value of <result_page_size>, we reserve ourselves the right to return
+ slightly smaller or bigger pages in the future for performance reasons.
+
+
+9. Error codes
+
+ The supported error codes are described below:
+ 0x0000 Server error: something unexpected happened. This indicates a
+ server-side bug.
+ 0x000A Protocol error: some client message triggered a protocol
+ violation (for instance a QUERY message is sent before a STARTUP
+ one has been sent)
+ 0x0100 Bad credentials: CREDENTIALS request failed because Cassandra
+ did not accept the provided credentials.
+
+ 0x1000 Unavailable exception. The rest of the ERROR message body will be
+ <cl><required><alive>
+ where:
+ <cl> is the [consistency] level of the query having triggered
+ the exception.
+ <required> is an [int] representing the number of node that
+ should be alive to respect <cl>
+ <alive> is an [int] representing the number of replica that
+ were known to be alive when the request has been
+ processed (since an unavailable exception has been
+ triggered, there will be <alive> < <required>)
+ 0x1001 Overloaded: the request cannot be processed because the
+ coordinator node is overloaded
+ 0x1002 Is_bootstrapping: the request was a read request but the
+ coordinator node is bootstrapping
+ 0x1003 Truncate_error: error during a truncation error.
+ 0x1100 Write_timeout: Timeout exception during a write request. The rest
+ of the ERROR message body will be
+ <cl><received><blockfor><writeType>
+ where:
+ <cl> is the [consistency] level of the query having triggered
+ the exception.
+ <received> is an [int] representing the number of nodes having
+ acknowledged the request.
+ <blockfor> is the number of replica whose acknowledgement is
+ required to achieve <cl>.
+ <writeType> is a [string] that describe the type of the write
+ that timeouted. The value of that string can be one
+ of:
+ - "SIMPLE": the write was a non-batched
+ non-counter write.
+ - "BATCH": the write was a (logged) batch write.
+ If this type is received, it means the batch log
+ has been successfully written (otherwise a
+ "BATCH_LOG" type would have been send instead).
+ - "UNLOGGED_BATCH": the write was an unlogged
+ batch. Not batch log write has been attempted.
+ - "COUNTER": the write was a counter write
+ (batched or not).
+ - "BATCH_LOG": the timeout occured during the
+ write to the batch log when a (logged) batch
+ write was requested.
+ 0x1200 Read_timeout: Timeout exception during a read request. The rest
+ of the ERROR message body will be
+ <cl><received><blockfor><data_present>
+ where:
+ <cl> is the [consistency] level of the query having triggered
+ the exception.
+ <received> is an [int] representing the number of nodes having
+ answered the request.
+ <blockfor> is the number of replica whose response is
+ required to achieve <cl>. Please note that it is
+ possible to have <received> >= <blockfor> if
+ <data_present> is false. And also in the (unlikely)
+ case were <cl> is achieved but the coordinator node
+ timeout while waiting for read-repair
+ acknowledgement.
+ <data_present> is a single byte. If its value is 0, it means
+ the replica that was asked for data has not
+ responded. Otherwise, the value is != 0.
+
+ 0x2000 Syntax_error: The submitted query has a syntax error.
+ 0x2100 Unauthorized: The logged user doesn't have the right to perform
+ the query.
+ 0x2200 Invalid: The query is syntactically correct but invalid.
+ 0x2300 Config_error: The query is invalid because of some configuration issue
+ 0x2400 Already_exists: The query attempted to create a keyspace or a
+ table that was already existing. The rest of the ERROR message
+ body will be <ks><table> where:
+ <ks> is a [string] representing either the keyspace that
+ already exists, or the keyspace in which the table that
+ already exists is.
+ <table> is a [string] representing the name of the table that
+ already exists. If the query was attempting to create a
+ keyspace, <table> will be present but will be the empty
+ string.
+ 0x2500 Unprepared: Can be thrown while a prepared statement tries to be
+ executed if the provide prepared statement ID is not known by
+ this host. The rest of the ERROR message body will be [short
+ bytes] representing the unknown ID.
+
+10. Changes from v2
+ * BATCH messages now have <flags> (like QUERY and EXECUTE) and a corresponding optional
+ <serial_consistency> parameters (see Section 4.1.7).
+ * User Defined Types have to added to ResultSet metadata (see 4.2.5.2) and a new section
+ on the serialization format of UDT values has been added to the documentation
+ (Section 7).
+ * The serialization format for collection has changed (both the collection size and
+ the length of each argument is now 4 bytes long). See Section 6.
+ * QUERY, EXECUTE and BATCH messages can now optionally provide the default timestamp for the query.
+ As this feature is optionally enabled by clients, implementing it is at the discretion of the
+ client.
+ * QUERY, EXECUTE and BATCH messages can now optionally provide the names for the values of the
+ query. As this feature is optionally enabled by clients, implementing it is at the discretion of the
+ client.
+ * The format of "SCHEMA_CHANGE" notifications has been modified, and now includes changes related to
+ user types.
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 237fc99..528a54a 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -258,8 +258,8 @@ public class Auth
try
{
ResultMessage.Rows rows = selectUserStatement.execute(QueryState.forInternalCalls(),
- new QueryOptions(consistencyForUser(username),
- Lists.newArrayList(ByteBufferUtil.bytes(username))));
+ QueryOptions.forInternalCalls(consistencyForUser(username),
+ Lists.newArrayList(ByteBufferUtil.bytes(username))));
return UntypedResultSet.create(rows.result);
}
catch (RequestValidationException e)
@@ -287,6 +287,10 @@ public class Auth
DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.columnFamily(ksName, cfName));
}
+ public void onDropUserType(String ksName, String userType)
+ {
+ }
+
public void onCreateKeyspace(String ksName)
{
}
@@ -295,6 +299,10 @@ public class Auth
{
}
+ public void onCreateUserType(String ksName, String userType)
+ {
+ }
+
public void onUpdateKeyspace(String ksName)
{
}
@@ -302,5 +310,9 @@ public class Auth
public void onUpdateColumnFamily(String ksName, String cfName)
{
}
+
+ public void onUpdateUserType(String ksName, String userType)
+ {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index 85d2b16..b37dee2 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -71,9 +71,9 @@ public class CassandraAuthorizer implements IAuthorizer
try
{
ResultMessage.Rows rows = authorizeStatement.execute(QueryState.forInternalCalls(),
- new QueryOptions(ConsistencyLevel.ONE,
- Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
- ByteBufferUtil.bytes(resource.getName()))));
+ QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
+ Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
+ ByteBufferUtil.bytes(resource.getName()))));
result = UntypedResultSet.create(rows.result);
}
catch (RequestValidationException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 1567bde..9256c2b 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -106,8 +106,8 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
try
{
ResultMessage.Rows rows = authenticateStatement.execute(QueryState.forInternalCalls(),
- new QueryOptions(consistencyForUser(username),
- Lists.newArrayList(ByteBufferUtil.bytes(username))));
+ QueryOptions.forInternalCalls(consistencyForUser(username),
+ Lists.newArrayList(ByteBufferUtil.bytes(username))));
result = UntypedResultSet.create(rows.result);
}
catch (RequestValidationException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index 7c33e5b..435757b 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -56,12 +56,12 @@ public class Attributes
return timeToLive != null;
}
- public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+ public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
{
if (timestamp == null)
return now;
- ByteBuffer tval = timestamp.bindAndGet(variables);
+ ByteBuffer tval = timestamp.bindAndGet(options);
if (tval == null)
throw new InvalidRequestException("Invalid null value of timestamp");
@@ -77,12 +77,12 @@ public class Attributes
return LongType.instance.compose(tval);
}
- public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
+ public int getTimeToLive(QueryOptions options) throws InvalidRequestException
{
if (timeToLive == null)
return 0;
- ByteBuffer tval = timeToLive.bindAndGet(variables);
+ ByteBuffer tval = timeToLive.bindAndGet(options);
if (tval == null)
throw new InvalidRequestException("Invalid null value of TTL");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index cbf5e92..2bb8071 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -18,38 +18,95 @@
package org.apache.cassandra.cql3;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
-/**
- * Options for a batch (at the protocol level) queries.
- */
-public class BatchQueryOptions
+public abstract class BatchQueryOptions
{
- private final ConsistencyLevel consistency;
- private final List<List<ByteBuffer>> values;
+ public static BatchQueryOptions DEFAULT = withoutPerStatementVariables(QueryOptions.DEFAULT);
+
+ protected final QueryOptions wrapped;
private final List<Object> queryOrIdList;
- public BatchQueryOptions(ConsistencyLevel cl, List<List<ByteBuffer>> values, List<Object> queryOrIdList)
+ protected BatchQueryOptions(QueryOptions wrapped, List<Object> queryOrIdList)
{
- this.consistency = cl;
- this.values = values;
+ this.wrapped = wrapped;
this.queryOrIdList = queryOrIdList;
}
+ public static BatchQueryOptions withoutPerStatementVariables(QueryOptions options)
+ {
+ return new WithoutPerStatementVariables(options, Collections.<Object>emptyList());
+ }
+
+ public static BatchQueryOptions withPerStatementVariables(QueryOptions options, List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+ {
+ return new WithPerStatementVariables(options, variables, queryOrIdList);
+ }
+
+ public abstract QueryOptions forStatement(int i);
+
public ConsistencyLevel getConsistency()
{
- return consistency;
+ return wrapped.getConsistency();
}
- public List<List<ByteBuffer>> getValues()
+ public ConsistencyLevel getSerialConsistency()
{
- return values;
+ return wrapped.getSerialConsistency();
}
public List<Object> getQueryOrIdList()
{
return queryOrIdList;
}
+
+ public long getTimestamp(QueryState state)
+ {
+ return wrapped.getTimestamp(state);
+ }
+
+ private static class WithoutPerStatementVariables extends BatchQueryOptions
+ {
+ private WithoutPerStatementVariables(QueryOptions wrapped, List<Object> queryOrIdList)
+ {
+ super(wrapped, queryOrIdList);
+ }
+
+ public QueryOptions forStatement(int i)
+ {
+ return wrapped;
+ }
+ }
+
+ private static class WithPerStatementVariables extends BatchQueryOptions
+ {
+ private final List<QueryOptions> perStatementOptions;
+
+ private WithPerStatementVariables(QueryOptions wrapped, List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+ {
+ super(wrapped, queryOrIdList);
+ this.perStatementOptions = new ArrayList<>(variables.size());
+ for (final List<ByteBuffer> vars : variables)
+ {
+ perStatementOptions.add(new QueryOptions.QueryOptionsWrapper(wrapped)
+ {
+ public List<ByteBuffer> getValues()
+ {
+ return vars;
+ }
+ });
+ }
+ }
+
+ public QueryOptions forStatement(int i)
+ {
+ return perStatementOptions.get(i);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/ColumnCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
index 67e7174..c2617fe 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnCondition.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
@@ -74,21 +74,21 @@ public class ColumnCondition
value.collectMarkerSpecification(boundNames);
}
- public ColumnCondition.WithVariables with(List<ByteBuffer> variables)
+ public ColumnCondition.WithOptions with(QueryOptions options)
{
- return new WithVariables(variables);
+ return new WithOptions(options);
}
- public class WithVariables
+ public class WithOptions
{
- private final List<ByteBuffer> variables;
+ private final QueryOptions options;
- private WithVariables(List<ByteBuffer> variables)
+ private WithOptions(QueryOptions options)
{
- this.variables = variables;
+ this.options = options;
}
- public boolean equalsTo(WithVariables other) throws InvalidRequestException
+ public boolean equalsTo(WithOptions other) throws InvalidRequestException
{
if (!column().equals(other.column()))
return false;
@@ -103,11 +103,11 @@ public class ColumnCondition
? Int32Type.instance
: ((MapType)column.type).keys;
- if (comparator.compare(collectionElement().bindAndGet(variables), other.collectionElement().bindAndGet(variables)) != 0)
+ if (comparator.compare(collectionElement().bindAndGet(options), other.collectionElement().bindAndGet(options)) != 0)
return false;
}
- return value().bindAndGet(variables).equals(other.value().bindAndGet(other.variables));
+ return value().bindAndGet(options).equals(other.value().bindAndGet(other.options));
}
private ColumnDefinition column()
@@ -127,7 +127,7 @@ public class ColumnCondition
public ByteBuffer getCollectionElementValue() throws InvalidRequestException
{
- return collectionElement == null ? null : collectionElement.bindAndGet(variables);
+ return collectionElement == null ? null : collectionElement.bindAndGet(options);
}
/**
@@ -140,7 +140,7 @@ public class ColumnCondition
assert collectionElement == null;
Cell c = current.getColumn(current.metadata().comparator.create(rowPrefix, column));
- ByteBuffer v = value.bindAndGet(variables);
+ ByteBuffer v = value.bindAndGet(options);
return v == null
? c == null || !c.isLive(now)
: c != null && c.isLive(now) && c.value().equals(v);
@@ -148,15 +148,15 @@ public class ColumnCondition
private boolean collectionAppliesTo(CollectionType type, Composite rowPrefix, ColumnFamily current, final long now) throws InvalidRequestException
{
- Term.Terminal v = value.bind(variables);
+ Term.Terminal v = value.bind(options);
// For map element access, we won't iterate over the collection, so deal with that first. In other case, we do.
if (collectionElement != null && type instanceof MapType)
{
- ByteBuffer e = collectionElement.bindAndGet(variables);
+ ByteBuffer e = collectionElement.bindAndGet(options);
if (e == null)
throw new InvalidRequestException("Invalid null value for map access");
- return mapElementAppliesTo((MapType)type, current, rowPrefix, e, v.get(), now);
+ return mapElementAppliesTo((MapType)type, current, rowPrefix, e, v.get(options), now);
}
CellName name = current.metadata().comparator.create(rowPrefix, column);
@@ -178,11 +178,11 @@ public class ColumnCondition
if (collectionElement != null)
{
assert type instanceof ListType;
- ByteBuffer e = collectionElement.bindAndGet(variables);
+ ByteBuffer e = collectionElement.bindAndGet(options);
if (e == null)
throw new InvalidRequestException("Invalid null value for list access");
- return listElementAppliesTo((ListType)type, iter, e, v.get());
+ return listElementAppliesTo((ListType)type, iter, e, v.get(options));
}
switch (type.kind)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index 3b7b4c4..5af84f0 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -53,7 +53,7 @@ public abstract class Constants
private final Term.Terminal NULL_VALUE = new Value(null)
{
@Override
- public Terminal bind(List<ByteBuffer> values)
+ public Terminal bind(QueryOptions options)
{
// We return null because that makes life easier for collections
return null;
@@ -246,13 +246,13 @@ public abstract class Constants
this.bytes = bytes;
}
- public ByteBuffer get()
+ public ByteBuffer get(QueryOptions options)
{
return bytes;
}
@Override
- public ByteBuffer bindAndGet(List<ByteBuffer> values)
+ public ByteBuffer bindAndGet(QueryOptions options)
{
return bytes;
}
@@ -267,11 +267,11 @@ public abstract class Constants
}
@Override
- public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
{
try
{
- ByteBuffer value = values.get(bindIndex);
+ ByteBuffer value = options.getValues().get(bindIndex);
if (value != null)
receiver.type.validate(value);
return value;
@@ -282,9 +282,9 @@ public abstract class Constants
}
}
- public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Value bind(QueryOptions options) throws InvalidRequestException
{
- ByteBuffer bytes = bindAndGet(values);
+ ByteBuffer bytes = bindAndGet(options);
return bytes == null ? null : new Constants.Value(bytes);
}
}
@@ -299,7 +299,7 @@ public abstract class Constants
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
CellName cname = cf.getComparator().create(prefix, column);
- ByteBuffer value = t.bindAndGet(params.variables);
+ ByteBuffer value = t.bindAndGet(params.options);
cf.addColumn(value == null ? params.makeTombstone(cname) : params.makeColumn(cname, value));
}
}
@@ -313,7 +313,7 @@ public abstract class Constants
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
- ByteBuffer bytes = t.bindAndGet(params.variables);
+ ByteBuffer bytes = t.bindAndGet(params.options);
if (bytes == null)
throw new InvalidRequestException("Invalid null value for counter increment");
long increment = ByteBufferUtil.toLong(bytes);
@@ -331,7 +331,7 @@ public abstract class Constants
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
- ByteBuffer bytes = t.bindAndGet(params.variables);
+ ByteBuffer bytes = t.bindAndGet(params.options);
if (bytes == null)
throw new InvalidRequestException("Invalid null value for counter increment");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index 580a2c9..751ccdb 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -28,10 +28,10 @@ import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -83,7 +83,7 @@ public abstract class Lists
values.add(t);
}
DelayedValue value = new DelayedValue(values);
- return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+ return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
}
private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -128,13 +128,13 @@ public abstract class Lists
this.elements = elements;
}
- public static Value fromSerialized(ByteBuffer value, ListType type) throws InvalidRequestException
+ public static Value fromSerialized(ByteBuffer value, ListType type, int version) throws InvalidRequestException
{
try
{
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
- List<?> l = (List<?>)type.compose(value);
+ List<?> l = (List<?>)type.getSerializer().deserializeForNativeProtocol(value, version);
List<ByteBuffer> elements = new ArrayList<ByteBuffer>(l.size());
for (Object element : l)
elements.add(type.elements.decompose(element));
@@ -146,9 +146,9 @@ public abstract class Lists
}
}
- public ByteBuffer get()
+ public ByteBuffer get(QueryOptions options)
{
- return CollectionType.pack(elements, elements.size());
+ return CollectionSerializer.pack(elements, elements.size(), options.getProtocolVersion());
}
}
@@ -180,12 +180,12 @@ public abstract class Lists
{
}
- public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Value bind(QueryOptions options) throws InvalidRequestException
{
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(elements.size());
for (Term t : elements)
{
- ByteBuffer bytes = t.bindAndGet(values);
+ ByteBuffer bytes = t.bindAndGet(options);
if (bytes == null)
throw new InvalidRequestException("null is not supported inside collections");
@@ -210,10 +210,10 @@ public abstract class Lists
assert receiver.type instanceof ListType;
}
- public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Value bind(QueryOptions options) throws InvalidRequestException
{
- ByteBuffer value = values.get(bindIndex);
- return value == null ? null : Value.fromSerialized(value, (ListType)receiver.type);
+ ByteBuffer value = options.getValues().get(bindIndex);
+ return value == null ? null : Value.fromSerialized(value, (ListType)receiver.type, options.getProtocolVersion());
}
}
@@ -299,8 +299,8 @@ public abstract class Lists
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
- ByteBuffer index = idx.bindAndGet(params.variables);
- ByteBuffer value = t.bindAndGet(params.variables);
+ ByteBuffer index = idx.bindAndGet(params.options);
+ ByteBuffer value = t.bindAndGet(params.options);
if (index == null)
throw new InvalidRequestException("Invalid null value for list index");
@@ -342,7 +342,7 @@ public abstract class Lists
static void doAppend(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
{
- Term.Terminal value = t.bind(params.variables);
+ Term.Terminal value = t.bind(params.options);
// If we append null, do nothing. Note that for Setter, we've
// already removed the previous value so we're good here too
if (value == null)
@@ -367,7 +367,7 @@ public abstract class Lists
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
- Term.Terminal value = t.bind(params.variables);
+ Term.Terminal value = t.bind(params.options);
if (value == null)
return;
@@ -403,7 +403,7 @@ public abstract class Lists
if (existingList.isEmpty())
return;
- Term.Terminal value = t.bind(params.variables);
+ Term.Terminal value = t.bind(params.options);
if (value == null)
return;
@@ -437,7 +437,7 @@ public abstract class Lists
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
- Term.Terminal index = t.bind(params.variables);
+ Term.Terminal index = t.bind(params.options);
if (index == null)
throw new InvalidRequestException("Invalid null value for list index");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index d113b57..0c4980c 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -31,9 +31,9 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -86,7 +86,7 @@ public abstract class Maps
values.put(k, v);
}
DelayedValue value = new DelayedValue(((MapType)receiver.type).keys, values);
- return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+ return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
}
private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -142,13 +142,13 @@ public abstract class Maps
this.map = map;
}
- public static Value fromSerialized(ByteBuffer value, MapType type) throws InvalidRequestException
+ public static Value fromSerialized(ByteBuffer value, MapType type, int version) throws InvalidRequestException
{
try
{
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
- Map<?, ?> m = (Map<?, ?>)type.compose(value);
+ Map<?, ?> m = (Map<?, ?>)type.getSerializer().deserializeForNativeProtocol(value, version);
Map<ByteBuffer, ByteBuffer> map = new LinkedHashMap<ByteBuffer, ByteBuffer>(m.size());
for (Map.Entry<?, ?> entry : m.entrySet())
map.put(type.keys.decompose(entry.getKey()), type.values.decompose(entry.getValue()));
@@ -160,7 +160,7 @@ public abstract class Maps
}
}
- public ByteBuffer get()
+ public ByteBuffer get(QueryOptions options)
{
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(2 * map.size());
for (Map.Entry<ByteBuffer, ByteBuffer> entry : map.entrySet())
@@ -168,7 +168,7 @@ public abstract class Maps
buffers.add(entry.getKey());
buffers.add(entry.getValue());
}
- return CollectionType.pack(buffers, map.size());
+ return CollectionSerializer.pack(buffers, map.size(), options.getProtocolVersion());
}
}
@@ -194,13 +194,13 @@ public abstract class Maps
{
}
- public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Value bind(QueryOptions options) throws InvalidRequestException
{
Map<ByteBuffer, ByteBuffer> buffers = new TreeMap<ByteBuffer, ByteBuffer>(comparator);
for (Map.Entry<Term, Term> entry : elements.entrySet())
{
// We don't support values > 64K because the serialization format encode the length as an unsigned short.
- ByteBuffer keyBytes = entry.getKey().bindAndGet(values);
+ ByteBuffer keyBytes = entry.getKey().bindAndGet(options);
if (keyBytes == null)
throw new InvalidRequestException("null is not supported inside collections");
if (keyBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -208,7 +208,7 @@ public abstract class Maps
FBUtilities.MAX_UNSIGNED_SHORT,
keyBytes.remaining()));
- ByteBuffer valueBytes = entry.getValue().bindAndGet(values);
+ ByteBuffer valueBytes = entry.getValue().bindAndGet(options);
if (valueBytes == null)
throw new InvalidRequestException("null is not supported inside collections");
if (valueBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -230,10 +230,10 @@ public abstract class Maps
assert receiver.type instanceof MapType;
}
- public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Value bind(QueryOptions options) throws InvalidRequestException
{
- ByteBuffer value = values.get(bindIndex);
- return value == null ? null : Value.fromSerialized(value, (MapType)receiver.type);
+ ByteBuffer value = options.getValues().get(bindIndex);
+ return value == null ? null : Value.fromSerialized(value, (MapType)receiver.type, options.getProtocolVersion());
}
}
@@ -272,8 +272,8 @@ public abstract class Maps
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
- ByteBuffer key = k.bindAndGet(params.variables);
- ByteBuffer value = t.bindAndGet(params.variables);
+ ByteBuffer key = k.bindAndGet(params.options);
+ ByteBuffer value = t.bindAndGet(params.options);
if (key == null)
throw new InvalidRequestException("Invalid null map key");
@@ -310,7 +310,7 @@ public abstract class Maps
static void doPut(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
{
- Term.Terminal value = t.bind(params.variables);
+ Term.Terminal value = t.bind(params.options);
if (value == null)
return;
assert value instanceof Maps.Value;
@@ -333,7 +333,7 @@ public abstract class Maps
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
- Term.Terminal key = t.bind(params.variables);
+ Term.Terminal key = t.bind(params.options);
if (key == null)
throw new InvalidRequestException("Invalid null map key");
assert key instanceof Constants.Value;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java
index 4d72333..2f28812 100644
--- a/src/java/org/apache/cassandra/cql3/QueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.cql3;
import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.service.QueryState;
@@ -28,7 +29,7 @@ public interface QueryHandler
{
public ResultMessage process(String query, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException;
public ResultMessage.Prepared prepare(String query, QueryState state) throws RequestValidationException;
- public CQLStatement getPrepared(MD5Digest id);
+ public ParsedStatement.Prepared getPrepared(MD5Digest id);
public CQLStatement getPreparedForThrift(Integer id);
public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException;
public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException;
[5/5] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by sl...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b4d9803
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b4d9803
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b4d9803
Branch: refs/heads/trunk
Commit: 6b4d980357de573f9128c3a065a6d11c54a3b571
Parents: ad34247 9872b74
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Apr 30 20:22:53 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Apr 30 20:22:53 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 2 +-
doc/native_protocol_v3.spec | 911 +++++++++++++++++++
src/java/org/apache/cassandra/auth/Auth.java | 16 +-
.../cassandra/auth/CassandraAuthorizer.java | 6 +-
.../cassandra/auth/PasswordAuthenticator.java | 4 +-
.../org/apache/cassandra/cql3/Attributes.java | 8 +-
.../cassandra/cql3/BatchQueryOptions.java | 81 +-
.../apache/cassandra/cql3/ColumnCondition.java | 32 +-
.../org/apache/cassandra/cql3/Constants.java | 20 +-
src/java/org/apache/cassandra/cql3/Lists.java | 34 +-
src/java/org/apache/cassandra/cql3/Maps.java | 32 +-
.../org/apache/cassandra/cql3/QueryHandler.java | 3 +-
.../org/apache/cassandra/cql3/QueryOptions.java | 283 ++++--
.../apache/cassandra/cql3/QueryProcessor.java | 29 +-
.../org/apache/cassandra/cql3/ResultSet.java | 6 +-
src/java/org/apache/cassandra/cql3/Sets.java | 26 +-
src/java/org/apache/cassandra/cql3/Term.java | 18 +-
.../apache/cassandra/cql3/UpdateParameters.java | 6 +-
.../org/apache/cassandra/cql3/UserTypes.java | 18 +-
.../cassandra/cql3/functions/FunctionCall.java | 20 +-
.../cql3/statements/BatchStatement.java | 95 +-
.../cql3/statements/CQL3CasConditions.java | 14 +-
.../cql3/statements/ModificationStatement.java | 63 +-
.../cassandra/cql3/statements/Restriction.java | 28 +-
.../cql3/statements/SelectStatement.java | 150 +--
.../org/apache/cassandra/db/DefsTables.java | 19 +-
.../cassandra/db/marshal/CollectionType.java | 29 +-
.../apache/cassandra/db/marshal/ListType.java | 12 +-
.../apache/cassandra/db/marshal/MapType.java | 21 +-
.../apache/cassandra/db/marshal/SetType.java | 15 +-
.../apache/cassandra/db/marshal/UserType.java | 5 +
.../hadoop/pig/AbstractCassandraStorage.java | 11 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 11 +-
.../serializers/CollectionSerializer.java | 106 ++-
.../cassandra/serializers/ListSerializer.java | 39 +-
.../cassandra/serializers/MapSerializer.java | 48 +-
.../cassandra/serializers/SetSerializer.java | 39 +-
.../cassandra/service/IMigrationListener.java | 3 +
.../cassandra/service/MigrationManager.java | 18 +
.../cassandra/thrift/CassandraServer.java | 4 +-
.../org/apache/cassandra/transport/CBUtil.java | 17 +
.../org/apache/cassandra/transport/Client.java | 4 +-
.../apache/cassandra/transport/DataType.java | 79 +-
.../org/apache/cassandra/transport/Event.java | 158 +++-
.../apache/cassandra/transport/OptionCodec.java | 28 +-
.../org/apache/cassandra/transport/Server.java | 21 +-
.../cassandra/transport/SimpleClient.java | 4 +-
.../transport/messages/BatchMessage.java | 53 +-
.../transport/messages/EventMessage.java | 6 +-
.../transport/messages/ExecuteMessage.java | 5 +-
.../cassandra/transport/SerDeserTest.java | 217 +++++
52 files changed, 2232 insertions(+), 646 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/build.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index db99060,40c45af..9a5ac92
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -51,13 -51,14 +51,13 @@@ public class QueryProcessor implements
private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
private static final MemoryMeter meter = new MemoryMeter().withGuessing(MemoryMeter.Guess.FALLBACK_BEST);
private static final long MAX_CACHE_PREPARED_MEMORY = Runtime.getRuntime().maxMemory() / 256;
- private static final int MAX_CACHE_PREPARED_COUNT = 10000;
- private static EntryWeigher<MD5Digest, CQLStatement> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, CQLStatement>()
+ private static EntryWeigher<MD5Digest, ParsedStatement.Prepared> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, ParsedStatement.Prepared>()
{
@Override
- public int weightOf(MD5Digest key, CQLStatement value)
+ public int weightOf(MD5Digest key, ParsedStatement.Prepared value)
{
- return Ints.checkedCast(measure(key) + measure(value));
+ return Ints.checkedCast(measure(key) + measure(value.statement) + measure(value.boundNames));
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
[3/5] Native protocol v3
Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 76b1eeb..12accaf 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.cql3;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -25,120 +26,244 @@ import java.util.List;
import io.netty.buffer.ByteBuf;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.transport.CBCodec;
import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.utils.Pair;
/**
* Options for a query.
*/
-public class QueryOptions
+public abstract class QueryOptions
{
- public static final QueryOptions DEFAULT = new QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList());
+ public static final QueryOptions DEFAULT = new DefaultQueryOptions(ConsistencyLevel.ONE,
+ Collections.<ByteBuffer>emptyList(),
+ false,
+ SpecificOptions.DEFAULT,
+ 3);
public static final CBCodec<QueryOptions> codec = new Codec();
- private final ConsistencyLevel consistency;
- private final List<ByteBuffer> values;
- private final boolean skipMetadata;
-
- private final SpecificOptions options;
+ public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer> values)
+ {
+ return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 1);
+ }
- // The protocol version of incoming queries. This is set during deserializaion and will be 0
- // if the QueryOptions does not come from a user message (or come from thrift).
- private final transient int protocolVersion;
+ public static QueryOptions fromProtocolV2(ConsistencyLevel consistency, List<ByteBuffer> values)
+ {
+ return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 2);
+ }
- public QueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values)
+ public static QueryOptions forInternalCalls(ConsistencyLevel consistency, List<ByteBuffer> values)
{
- this(consistency, values, false, SpecificOptions.DEFAULT, 0);
+ return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 0);
}
- public QueryOptions(ConsistencyLevel consistency,
- List<ByteBuffer> values,
- boolean skipMetadata,
- int pageSize,
- PagingState pagingState,
- ConsistencyLevel serialConsistency)
+ public static QueryOptions fromPreV3Batch(ConsistencyLevel consistency)
{
- this(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency), 0);
+ return new DefaultQueryOptions(consistency, Collections.<ByteBuffer>emptyList(), false, SpecificOptions.DEFAULT, 2);
}
- private QueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, int protocolVersion)
+ public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency)
{
- this.consistency = consistency;
- this.values = values;
- this.skipMetadata = skipMetadata;
- this.options = options;
- this.protocolVersion = protocolVersion;
+ return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L), 0);
}
- public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer> values)
+ public abstract ConsistencyLevel getConsistency();
+ public abstract List<ByteBuffer> getValues();
+ public abstract boolean skipMetadata();
+
+ /** The pageSize for this query. Will be <= 0 if not relevant for the query. */
+ public int getPageSize()
{
- return new QueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 1);
+ return getSpecificOptions().pageSize;
}
- public ConsistencyLevel getConsistency()
+ /** The paging state for this query, or null if not relevant. */
+ public PagingState getPagingState()
{
- return consistency;
+ return getSpecificOptions().state;
}
- public List<ByteBuffer> getValues()
+ /** Serial consistency for conditional updates. */
+ public ConsistencyLevel getSerialConsistency()
{
- return values;
+ return getSpecificOptions().serialConsistency;
}
- public boolean skipMetadata()
+ public long getTimestamp(QueryState state)
{
- return skipMetadata;
+ long tstamp = getSpecificOptions().timestamp;
+ return tstamp >= 0 ? tstamp : state.getTimestamp();
}
/**
- * The pageSize for this query. Will be <= 0 if not relevant for the query.
+ * The protocol version for the query. Will be 3 if the object don't come from
+ * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
*/
- public int getPageSize()
+ public abstract int getProtocolVersion();
+
+ public abstract QueryOptions withProtocolVersion(int version);
+
+ // Mainly for the sake of BatchQueryOptions
+ abstract SpecificOptions getSpecificOptions();
+
+ public QueryOptions prepare(List<ColumnSpecification> specs)
{
- return options.pageSize;
+ return this;
}
- /**
- * The paging state for this query, or null if not relevant.
- */
- public PagingState getPagingState()
+ static class DefaultQueryOptions extends QueryOptions
{
- return options.state;
+ private final ConsistencyLevel consistency;
+ private final List<ByteBuffer> values;
+ private final boolean skipMetadata;
+
+ private final SpecificOptions options;
+
+ // The protocol version of incoming queries. This is set during deserializaion and will be 0
+ // if the QueryOptions does not come from a user message (or come from thrift).
+ private final transient int protocolVersion;
+
+ DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, int protocolVersion)
+ {
+ this.consistency = consistency;
+ this.values = values;
+ this.skipMetadata = skipMetadata;
+ this.options = options;
+ this.protocolVersion = protocolVersion;
+ }
+
+ public QueryOptions withProtocolVersion(int version)
+ {
+ return new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
+ }
+
+ public ConsistencyLevel getConsistency()
+ {
+ return consistency;
+ }
+
+ public List<ByteBuffer> getValues()
+ {
+ return values;
+ }
+
+ public boolean skipMetadata()
+ {
+ return skipMetadata;
+ }
+
+ public int getProtocolVersion()
+ {
+ return protocolVersion;
+ }
+
+ SpecificOptions getSpecificOptions()
+ {
+ return options;
+ }
}
- /**
- * Serial consistency for conditional updates.
- */
- public ConsistencyLevel getSerialConsistency()
+ static abstract class QueryOptionsWrapper extends QueryOptions
{
- return options.serialConsistency;
+ protected final QueryOptions wrapped;
+
+ QueryOptionsWrapper(QueryOptions wrapped)
+ {
+ this.wrapped = wrapped;
+ }
+
+ public ConsistencyLevel getConsistency()
+ {
+ return wrapped.getConsistency();
+ }
+
+ public boolean skipMetadata()
+ {
+ return wrapped.skipMetadata();
+ }
+
+ public int getProtocolVersion()
+ {
+ return wrapped.getProtocolVersion();
+ }
+
+ SpecificOptions getSpecificOptions()
+ {
+ return wrapped.getSpecificOptions();
+ }
+
+ @Override
+ public QueryOptions prepare(List<ColumnSpecification> specs)
+ {
+ wrapped.prepare(specs);
+ return this;
+ }
+
+ public QueryOptions withProtocolVersion(int version)
+ {
+ return new DefaultQueryOptions(getConsistency(), getValues(), skipMetadata(), getSpecificOptions(), version);
+ }
}
- /**
- * The protocol version for the query. Will be 0 if the object don't come from
- * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
- */
- public int getProtocolVersion()
+ static class OptionsWithNames extends QueryOptionsWrapper
{
- return protocolVersion;
+ private final List<String> names;
+ private List<ByteBuffer> orderedValues;
+
+ OptionsWithNames(DefaultQueryOptions wrapped, List<String> names)
+ {
+ super(wrapped);
+ this.names = names;
+ }
+
+ @Override
+ public QueryOptions prepare(List<ColumnSpecification> specs)
+ {
+ super.prepare(specs);
+
+ orderedValues = new ArrayList<ByteBuffer>(specs.size());
+ for (int i = 0; i < specs.size(); i++)
+ {
+ String name = specs.get(i).name.toString();
+ for (int j = 0; j < names.size(); j++)
+ {
+ if (name.equals(names.get(j)))
+ {
+ orderedValues.add(wrapped.getValues().get(j));
+ break;
+ }
+ }
+ }
+ return this;
+ }
+
+ public List<ByteBuffer> getValues()
+ {
+ assert orderedValues != null; // We should have called prepare first!
+ return orderedValues;
+ }
}
// Options that are likely to not be present in most queries
- private static class SpecificOptions
+ static class SpecificOptions
{
- private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null);
+ private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, -1L);
private final int pageSize;
private final PagingState state;
private final ConsistencyLevel serialConsistency;
+ private final long timestamp;
- private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency)
+ private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp)
{
this.pageSize = pageSize;
this.state = state;
this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency;
+ this.timestamp = timestamp;
}
}
@@ -151,7 +276,9 @@ public class QueryOptions
SKIP_METADATA,
PAGE_SIZE,
PAGING_STATE,
- SERIAL_CONSISTENCY;
+ SERIAL_CONSISTENCY,
+ TIMESTAMP,
+ NAMES_FOR_VALUES;
public static EnumSet<Flag> deserialize(int flags)
{
@@ -181,9 +308,21 @@ public class QueryOptions
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
- List<ByteBuffer> values = flags.contains(Flag.VALUES)
- ? CBUtil.readValueList(body)
- : Collections.<ByteBuffer>emptyList();
+ List<ByteBuffer> values = Collections.<ByteBuffer>emptyList();
+ List<String> names = null;
+ if (flags.contains(Flag.VALUES))
+ {
+ if (flags.contains(Flag.NAMES_FOR_VALUES))
+ {
+ Pair<List<String>, List<ByteBuffer>> namesAndValues = CBUtil.readNameAndValueList(body);
+ names = namesAndValues.left;
+ values = namesAndValues.right;
+ }
+ else
+ {
+ values = CBUtil.readValueList(body);
+ }
+ }
boolean skipMetadata = flags.contains(Flag.SKIP_METADATA);
flags.remove(Flag.VALUES);
@@ -195,9 +334,19 @@ public class QueryOptions
int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt() : -1;
PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body)) : null;
ConsistencyLevel serialConsistency = flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : ConsistencyLevel.SERIAL;
- options = new SpecificOptions(pageSize, pagingState, serialConsistency);
+ long timestamp = -1L;
+ if (flags.contains(Flag.TIMESTAMP))
+ {
+ long ts = body.readLong();
+ if (ts < 0)
+ throw new ProtocolException("Invalid negative (" + ts + ") protocol level timestamp");
+ timestamp = ts;
+ }
+
+ options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp);
}
- return new QueryOptions(consistency, values, skipMetadata, options, version);
+ DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
+ return names == null ? opts : new OptionsWithNames(opts, names);
}
public void encode(QueryOptions options, ByteBuf dest, int version)
@@ -217,6 +366,12 @@ public class QueryOptions
CBUtil.writeValue(options.getPagingState().serialize(), dest);
if (flags.contains(Flag.SERIAL_CONSISTENCY))
CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
+ if (flags.contains(Flag.TIMESTAMP))
+ dest.writeLong(options.getSpecificOptions().timestamp);
+
+ // Note that we don't really have to bother with NAMES_FOR_VALUES server side,
+ // and in fact we never really encode QueryOptions, only decode them, so we
+ // don't bother.
}
public int encodedSize(QueryOptions options, int version)
@@ -236,6 +391,8 @@ public class QueryOptions
size += CBUtil.sizeOfValue(options.getPagingState().serialize());
if (flags.contains(Flag.SERIAL_CONSISTENCY))
size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
+ if (flags.contains(Flag.TIMESTAMP))
+ size += 8;
return size;
}
@@ -245,7 +402,7 @@ public class QueryOptions
EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
if (options.getValues().size() > 0)
flags.add(Flag.VALUES);
- if (options.skipMetadata)
+ if (options.skipMetadata())
flags.add(Flag.SKIP_METADATA);
if (options.getPageSize() >= 0)
flags.add(Flag.PAGE_SIZE);
@@ -253,6 +410,8 @@ public class QueryOptions
flags.add(Flag.PAGING_STATE);
if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
flags.add(Flag.SERIAL_CONSISTENCY);
+ if (options.getSpecificOptions().timestamp >= 0)
+ flags.add(Flag.TIMESTAMP);
return flags;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index e8cee15..40c45af 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -53,12 +53,12 @@ public class QueryProcessor implements QueryHandler
private static final long MAX_CACHE_PREPARED_MEMORY = Runtime.getRuntime().maxMemory() / 256;
private static final int MAX_CACHE_PREPARED_COUNT = 10000;
- private static EntryWeigher<MD5Digest, CQLStatement> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, CQLStatement>()
+ private static EntryWeigher<MD5Digest, ParsedStatement.Prepared> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, ParsedStatement.Prepared>()
{
@Override
- public int weightOf(MD5Digest key, CQLStatement value)
+ public int weightOf(MD5Digest key, ParsedStatement.Prepared value)
{
- return Ints.checkedCast(measure(key) + measure(value));
+ return Ints.checkedCast(measure(key) + measure(value.statement) + measure(value.boundNames));
}
};
@@ -71,12 +71,12 @@ public class QueryProcessor implements QueryHandler
}
};
- private static final ConcurrentLinkedHashMap<MD5Digest, CQLStatement> preparedStatements;
+ private static final ConcurrentLinkedHashMap<MD5Digest, ParsedStatement.Prepared> preparedStatements;
private static final ConcurrentLinkedHashMap<Integer, CQLStatement> thriftPreparedStatements;
static
{
- preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, CQLStatement>()
+ preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, ParsedStatement.Prepared>()
.maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)
.weigher(cqlMemoryUsageWeigher)
.build();
@@ -90,7 +90,7 @@ public class QueryProcessor implements QueryHandler
{
}
- public CQLStatement getPrepared(MD5Digest id)
+ public ParsedStatement.Prepared getPrepared(MD5Digest id)
{
return preparedStatements.get(id);
}
@@ -154,29 +154,31 @@ public class QueryProcessor implements QueryHandler
public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
throws RequestExecutionException, RequestValidationException
{
- return instance.process(queryString, queryState, new QueryOptions(cl, Collections.<ByteBuffer>emptyList()));
+ return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
}
public ResultMessage process(String queryString, QueryState queryState, QueryOptions options)
throws RequestExecutionException, RequestValidationException
{
- CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
+ ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
+ options.prepare(p.boundNames);
+ CQLStatement prepared = p.statement;
if (prepared.getBoundTerms() != options.getValues().size())
throw new InvalidRequestException("Invalid amount of bind variables");
return processStatement(prepared, queryState, options);
}
- public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+ public static ParsedStatement.Prepared parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
{
- return getStatement(queryStr, queryState.getClientState()).statement;
+ return getStatement(queryStr, queryState.getClientState());
}
public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
{
try
{
- ResultMessage result = instance.process(query, QueryState.forInternalCalls(), new QueryOptions(cl, Collections.<ByteBuffer>emptyList()));
+ ResultMessage result = instance.process(query, QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
if (result instanceof ResultMessage.Rows)
return UntypedResultSet.create(((ResultMessage.Rows)result).result);
else
@@ -276,7 +278,7 @@ public class QueryProcessor implements QueryHandler
else
{
MD5Digest statementId = MD5Digest.compute(toHash);
- preparedStatements.put(statementId, prepared.statement);
+ preparedStatements.put(statementId, prepared);
logger.trace(String.format("Stored prepared statement %s with %d bind markers",
statementId,
prepared.statement.getBoundTerms()));
@@ -312,8 +314,7 @@ public class QueryProcessor implements QueryHandler
ClientState clientState = queryState.getClientState();
batch.checkAccess(clientState);
batch.validate(clientState);
-
- batch.executeWithPerStatementVariables(options.getConsistency(), queryState, options.getValues());
+ batch.execute(queryState, options);
return new ResultMessage.Void();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 53ba380..eea0475 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -373,7 +373,7 @@ public class ResultSet
String ksName = globalTablesSpec ? globalKsName : CBUtil.readString(body);
String cfName = globalTablesSpec ? globalCfName : CBUtil.readString(body);
ColumnIdentifier colName = new ColumnIdentifier(CBUtil.readString(body), true);
- AbstractType type = DataType.toType(DataType.codec.decodeOne(body));
+ AbstractType type = DataType.toType(DataType.codec.decodeOne(body, version));
names.add(new ColumnSpecification(ksName, cfName, colName, type));
}
return new Metadata(flags, names).setHasMorePages(state);
@@ -410,7 +410,7 @@ public class ResultSet
CBUtil.writeString(name.cfName, dest);
}
CBUtil.writeString(name.name.toString(), dest);
- DataType.codec.writeOne(DataType.fromType(name.type), dest);
+ DataType.codec.writeOne(DataType.fromType(name.type, version), dest, version);
}
}
}
@@ -442,7 +442,7 @@ public class ResultSet
size += CBUtil.sizeOfString(name.cfName);
}
size += CBUtil.sizeOfString(name.name.toString());
- size += DataType.codec.oneSerializedSize(DataType.fromType(name.type));
+ size += DataType.codec.oneSerializedSize(DataType.fromType(name.type, version), version);
}
}
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index e48a3ce..92a3510 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -88,7 +88,7 @@ public abstract class Sets
values.add(t);
}
DelayedValue value = new DelayedValue(((SetType)receiver.type).elements, values);
- return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+ return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
}
private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -140,13 +140,13 @@ public abstract class Sets
this.elements = elements;
}
- public static Value fromSerialized(ByteBuffer value, SetType type) throws InvalidRequestException
+ public static Value fromSerialized(ByteBuffer value, SetType type, int version) throws InvalidRequestException
{
try
{
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
- Set<?> s = (Set<?>)type.compose(value);
+ Set<?> s = (Set<?>)type.getSerializer().deserializeForNativeProtocol(value, version);
Set<ByteBuffer> elements = new LinkedHashSet<ByteBuffer>(s.size());
for (Object element : s)
elements.add(type.elements.decompose(element));
@@ -158,9 +158,9 @@ public abstract class Sets
}
}
- public ByteBuffer get()
+ public ByteBuffer get(QueryOptions options)
{
- return CollectionType.pack(new ArrayList<ByteBuffer>(elements), elements.size());
+ return CollectionSerializer.pack(new ArrayList<ByteBuffer>(elements), elements.size(), options.getProtocolVersion());
}
}
@@ -186,12 +186,12 @@ public abstract class Sets
{
}
- public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Value bind(QueryOptions options) throws InvalidRequestException
{
Set<ByteBuffer> buffers = new TreeSet<ByteBuffer>(comparator);
for (Term t : elements)
{
- ByteBuffer bytes = t.bindAndGet(values);
+ ByteBuffer bytes = t.bindAndGet(options);
if (bytes == null)
throw new InvalidRequestException("null is not supported inside collections");
@@ -216,10 +216,10 @@ public abstract class Sets
assert receiver.type instanceof SetType;
}
- public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Value bind(QueryOptions options) throws InvalidRequestException
{
- ByteBuffer value = values.get(bindIndex);
- return value == null ? null : Value.fromSerialized(value, (SetType)receiver.type);
+ ByteBuffer value = options.getValues().get(bindIndex);
+ return value == null ? null : Value.fromSerialized(value, (SetType)receiver.type, options.getProtocolVersion());
}
}
@@ -253,7 +253,7 @@ public abstract class Sets
static void doAdd(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
{
- Term.Terminal value = t.bind(params.variables);
+ Term.Terminal value = t.bind(params.options);
if (value == null)
return;
@@ -277,7 +277,7 @@ public abstract class Sets
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
- Term.Terminal value = t.bind(params.variables);
+ Term.Terminal value = t.bind(params.options);
if (value == null)
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index d539ecf..481514f 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -48,7 +48,7 @@ public interface Term
* @return the result of binding all the variables of this NonTerminal (or
* 'this' if the term is terminal).
*/
- public Terminal bind(List<ByteBuffer> values) throws InvalidRequestException;
+ public Terminal bind(QueryOptions options) throws InvalidRequestException;
/**
* A shorter for bind(values).get().
@@ -56,7 +56,7 @@ public interface Term
* object between the bind and the get (note that we still want to be able
* to separate bind and get for collections).
*/
- public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException;
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException;
/**
* Whether or not that term contains at least one bind marker.
@@ -108,7 +108,7 @@ public interface Term
public abstract class Terminal implements Term
{
public void collectMarkerSpecification(VariableSpecifications boundNames) {}
- public Terminal bind(List<ByteBuffer> values) { return this; }
+ public Terminal bind(QueryOptions options) { return this; }
// While some NonTerminal may not have bind markers, no Term can be Terminal
// with a bind marker
@@ -120,11 +120,11 @@ public interface Term
/**
* @return the serialized value of this terminal.
*/
- public abstract ByteBuffer get();
+ public abstract ByteBuffer get(QueryOptions options);
- public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
{
- return get();
+ return get(options);
}
}
@@ -140,10 +140,10 @@ public interface Term
*/
public abstract class NonTerminal implements Term
{
- public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
{
- Terminal t = bind(values);
- return t == null ? null : t.get();
+ Terminal t = bind(options);
+ return t == null ? null : t.get(options);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index fad8fae..8a47536 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
public class UpdateParameters
{
public final CFMetaData metadata;
- public final List<ByteBuffer> variables;
+ public final QueryOptions options;
public final long timestamp;
private final int ttl;
public final int localDeletionTime;
@@ -42,10 +42,10 @@ public class UpdateParameters
// For lists operation that require a read-before-write. Will be null otherwise.
private final Map<ByteBuffer, CQL3Row> prefetchedLists;
- public UpdateParameters(CFMetaData metadata, List<ByteBuffer> variables, long timestamp, int ttl, Map<ByteBuffer, CQL3Row> prefetchedLists)
+ public UpdateParameters(CFMetaData metadata, QueryOptions options, long timestamp, int ttl, Map<ByteBuffer, CQL3Row> prefetchedLists)
{
this.metadata = metadata;
- this.variables = variables;
+ this.options = options;
this.timestamp = timestamp;
this.ttl = ttl;
this.localDeletionTime = (int)(System.currentTimeMillis() / 1000);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/UserTypes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UserTypes.java b/src/java/org/apache/cassandra/cql3/UserTypes.java
index 2fd1a0f..2faa960 100644
--- a/src/java/org/apache/cassandra/cql3/UserTypes.java
+++ b/src/java/org/apache/cassandra/cql3/UserTypes.java
@@ -65,7 +65,7 @@ public abstract class UserTypes
values.add(value);
}
DelayedValue value = new DelayedValue(((UserType)receiver.type), values);
- return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+ return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
}
private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -144,12 +144,16 @@ public abstract class UserTypes
values.get(i).collectMarkerSpecification(boundNames);
}
- private ByteBuffer[] bindInternal(List<ByteBuffer> variables) throws InvalidRequestException
+ private ByteBuffer[] bindInternal(QueryOptions options) throws InvalidRequestException
{
+ // Inside UDT values, we must force the serialization of collections whatever the protocol version is in
+ // use since we're going to store directly that serialized value.
+ options = options.withProtocolVersion(3);
+
ByteBuffer[] buffers = new ByteBuffer[values.size()];
for (int i = 0; i < type.types.size(); i++)
{
- ByteBuffer buffer = values.get(i).bindAndGet(variables);
+ ByteBuffer buffer = values.get(i).bindAndGet(options);
if (buffer == null)
throw new InvalidRequestException("null is not supported inside user type literals");
if (buffer.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -163,15 +167,15 @@ public abstract class UserTypes
return buffers;
}
- public Constants.Value bind(List<ByteBuffer> variables) throws InvalidRequestException
+ public Constants.Value bind(QueryOptions options) throws InvalidRequestException
{
- return new Constants.Value(bindAndGet(variables));
+ return new Constants.Value(bindAndGet(options));
}
@Override
- public ByteBuffer bindAndGet(List<ByteBuffer> variables) throws InvalidRequestException
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
{
- return CompositeType.build(bindInternal(variables));
+ return CompositeType.build(bindInternal(options));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 083543a..f99a2e4 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -46,19 +46,19 @@ public class FunctionCall extends Term.NonTerminal
t.collectMarkerSpecification(boundNames);
}
- public Term.Terminal bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Term.Terminal bind(QueryOptions options) throws InvalidRequestException
{
- return makeTerminal(fun, bindAndGet(values));
+ return makeTerminal(fun, bindAndGet(options), options.getProtocolVersion());
}
- public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
{
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(terms.size());
for (Term t : terms)
{
// For now, we don't allow nulls as argument as no existing function needs it and it
// simplify things.
- ByteBuffer val = t.bindAndGet(values);
+ ByteBuffer val = t.bindAndGet(options);
if (val == null)
throw new InvalidRequestException(String.format("Invalid null value for argument to %s", fun));
buffers.add(val);
@@ -77,16 +77,16 @@ public class FunctionCall extends Term.NonTerminal
return false;
}
- private static Term.Terminal makeTerminal(Function fun, ByteBuffer result) throws InvalidRequestException
+ private static Term.Terminal makeTerminal(Function fun, ByteBuffer result, int version) throws InvalidRequestException
{
if (!(fun.returnType() instanceof CollectionType))
return new Constants.Value(result);
switch (((CollectionType)fun.returnType()).kind)
{
- case LIST: return Lists.Value.fromSerialized(result, (ListType)fun.returnType());
- case SET: return Sets.Value.fromSerialized(result, (SetType)fun.returnType());
- case MAP: return Maps.Value.fromSerialized(result, (MapType)fun.returnType());
+ case LIST: return Lists.Value.fromSerialized(result, (ListType)fun.returnType(), version);
+ case SET: return Sets.Value.fromSerialized(result, (SetType)fun.returnType(), version);
+ case MAP: return Maps.Value.fromSerialized(result, (MapType)fun.returnType(), version);
}
throw new AssertionError();
}
@@ -119,7 +119,7 @@ public class FunctionCall extends Term.NonTerminal
// If all parameters are terminal and the function is pure, we can
// evaluate it now, otherwise we'd have to wait execution time
return allTerminal && fun.isPure()
- ? makeTerminal(fun, execute(fun, parameters))
+ ? makeTerminal(fun, execute(fun, parameters), QueryOptions.DEFAULT.getProtocolVersion())
: new FunctionCall(fun, parameters);
}
@@ -130,7 +130,7 @@ public class FunctionCall extends Term.NonTerminal
for (Term t : parameters)
{
assert t instanceof Term.Terminal;
- buffers.add(((Term.Terminal)t).get());
+ buffers.add(((Term.Terminal)t).get(QueryOptions.DEFAULT));
}
return fun.execute(buffers);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 88bb644..95d504d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -116,16 +116,16 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
return statements;
}
- private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
+ private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now)
throws RequestExecutionException, RequestValidationException
{
Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
- List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
- long timestamp = attrs.getTimestamp(now, statementVariables);
- addStatementMutations(statement, statementVariables, local, cl, timestamp, mutations);
+ QueryOptions statementOptions = options.forStatement(i);
+ long timestamp = attrs.getTimestamp(now, statementOptions);
+ addStatementMutations(statement, statementOptions, local, timestamp, mutations);
}
return unzipMutations(mutations);
}
@@ -143,9 +143,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
}
private void addStatementMutations(ModificationStatement statement,
- List<ByteBuffer> variables,
+ QueryOptions options,
boolean local,
- ConsistencyLevel cl,
long now,
Map<String, Map<ByteBuffer, IMutation>> mutations)
throws RequestExecutionException, RequestValidationException
@@ -161,9 +160,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
// The following does the same than statement.getMutations(), but we inline it here because
// we don't want to recreate mutations every time as this is particularly inefficient when applying
// multiple batch to the same partition (see #6737).
- List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
- Composite clusteringPrefix = statement.createClusteringPrefix(variables);
- UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+ List<ByteBuffer> keys = statement.buildPartitionKeyNames(options);
+ Composite clusteringPrefix = statement.createClusteringPrefix(options);
+ UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, options, local, now);
for (ByteBuffer key : keys)
{
@@ -172,7 +171,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
if (mutation == null)
{
mut = new Mutation(ksName, key);
- mutation = type == Type.COUNTER ? new CounterMutation(mut, cl) : mut;
+ mutation = type == Type.COUNTER ? new CounterMutation(mut, options.getConsistency()) : mut;
ksMap.put(key, mutation);
}
else
@@ -209,29 +208,26 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
- if (options.getConsistency() == null)
- throw new InvalidRequestException("Invalid empty consistency level");
-
- return execute(new PreparedBatchVariables(options.getValues()), false, options.getConsistency(), options.getSerialConsistency(), queryState.getTimestamp());
+ return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));
}
- public ResultMessage executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
+ public ResultMessage execute(QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException
{
- if (cl == null)
- throw new InvalidRequestException("Invalid empty consistency level");
-
- return execute(new BatchOfPreparedVariables(variables), false, cl, ConsistencyLevel.SERIAL, queryState.getTimestamp());
+ return execute(options, false, options.getTimestamp(queryState));
}
- public ResultMessage execute(BatchVariables variables, boolean local, ConsistencyLevel cl, ConsistencyLevel serialCl, long now)
+ public ResultMessage execute(BatchQueryOptions options, boolean local, long now)
throws RequestExecutionException, RequestValidationException
{
- // TODO: we don't support a serial consistency for batches in the protocol so defaulting to SERIAL for now.
- // We'll need to fix that.
+ if (options.getConsistency() == null)
+ throw new InvalidRequestException("Invalid empty consistency level");
+ if (options.getSerialConsistency() == null)
+ throw new InvalidRequestException("Invalid empty serial consistency level");
+
if (hasConditions)
- return executeWithConditions(variables, cl, serialCl, now);
+ return executeWithConditions(options, now);
- executeWithoutConditions(getMutations(variables, local, cl, now), cl);
+ executeWithoutConditions(getMutations(options, local, now), options.getConsistency());
return null;
}
@@ -251,8 +247,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
}
-
- private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
+ private ResultMessage executeWithConditions(BatchQueryOptions options, long now)
throws RequestExecutionException, RequestValidationException
{
ByteBuffer key = null;
@@ -265,9 +260,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
- List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
- long timestamp = attrs.getTimestamp(now, statementVariables);
- List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementVariables);
+ QueryOptions statementOptions = options.forStatement(i);
+ long timestamp = attrs.getTimestamp(now, statementOptions);
+ List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementOptions);
if (pks.size() > 1)
throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
if (key == null)
@@ -283,10 +278,10 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
}
- Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables);
+ Composite clusteringPrefix = statement.createClusteringPrefix(statementOptions);
if (statement.hasConditions())
{
- statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
+ statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementOptions, timestamp);
// As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
if (statement.hasIfNotExistCondition() || statement.hasIfExistCondition())
columnsWithConditions = null;
@@ -295,20 +290,20 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementVariables, false, cl, now);
+ UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementOptions, false, now);
statement.addUpdateForKey(updates, key, clusteringPrefix, params);
}
}
verifyBatchSize(Collections.singleton(updates));
- ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
+ ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, options.getSerialConsistency(), options.getConsistency());
return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
}
public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
{
assert !hasConditions;
- for (IMutation mutation : getMutations(PreparedBatchVariables.EMPTY, true, null, queryState.getTimestamp()))
+ for (IMutation mutation : getMutations(BatchQueryOptions.DEFAULT, true, queryState.getTimestamp()))
{
// We don't use counters internally.
assert mutation instanceof Mutation;
@@ -322,38 +317,6 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
}
- public static class PreparedBatchVariables implements BatchVariables
- {
- public static final BatchVariables EMPTY = new PreparedBatchVariables(Collections.<ByteBuffer>emptyList());
-
- private final List<ByteBuffer> variables;
-
- public PreparedBatchVariables(List<ByteBuffer> variables)
- {
- this.variables = variables;
- }
-
- public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
- {
- return variables;
- }
- }
-
- public static class BatchOfPreparedVariables implements BatchVariables
- {
- private final List<List<ByteBuffer>> variables;
-
- public BatchOfPreparedVariables(List<List<ByteBuffer>> variables)
- {
- this.variables = variables;
- }
-
- public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
- {
- return variables.get(statementInBatch);
- }
- }
-
public String toString()
{
return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
index 4003edc..5005d2f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
@@ -71,7 +71,7 @@ public class CQL3CasConditions implements CASConditions
throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row");
}
- public void addConditions(Composite prefix, Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+ public void addConditions(Composite prefix, Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
{
RowCondition condition = conditions.get(prefix);
if (condition == null)
@@ -83,7 +83,7 @@ public class CQL3CasConditions implements CASConditions
{
throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
}
- ((ColumnsConditions)condition).addConditions(conds, variables);
+ ((ColumnsConditions)condition).addConditions(conds, options);
}
public IDiskAtomFilter readFilter()
@@ -167,21 +167,21 @@ public class CQL3CasConditions implements CASConditions
private static class ColumnsConditions extends RowCondition
{
- private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.WithVariables> conditions = new HashMap<>();
+ private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.WithOptions> conditions = new HashMap<>();
private ColumnsConditions(Composite rowPrefix, long now)
{
super(rowPrefix, now);
}
- public void addConditions(Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+ public void addConditions(Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
{
for (ColumnCondition condition : conds)
{
// We will need the variables in appliesTo but with protocol batches, each condition in this object can have a
// different list of variables.
- ColumnCondition.WithVariables current = condition.with(variables);
- ColumnCondition.WithVariables previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current);
+ ColumnCondition.WithOptions current = condition.with(options);
+ ColumnCondition.WithOptions previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current);
// If 2 conditions are actually equal, let it slide
if (previous != null && !previous.equalsTo(current))
throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name);
@@ -193,7 +193,7 @@ public class CQL3CasConditions implements CASConditions
if (current == null)
return conditions.isEmpty();
- for (ColumnCondition.WithVariables condition : conditions.values())
+ for (ColumnCondition.WithOptions condition : conditions.values())
if (!condition.appliesTo(rowPrefix, current, now))
return false;
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 4741b9a..7f8b678 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -119,9 +119,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return cfm.isCounter();
}
- public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+ public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
{
- return attrs.getTimestamp(now, variables);
+ return attrs.getTimestamp(now, options);
}
public boolean isTimestampSet()
@@ -129,9 +129,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return attrs.isTimestampSet();
}
- public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
+ public int getTimeToLive(QueryOptions options) throws InvalidRequestException
{
- return attrs.getTimeToLive(variables);
+ return attrs.getTimeToLive(options);
}
public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@ -284,7 +284,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
}
- public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+ public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
throws InvalidRequestException
{
CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
@@ -295,7 +295,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (r == null)
throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
if (keyBuilder.remainingCount() == 1)
{
@@ -321,7 +321,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return keys;
}
- public Composite createClusteringPrefix(List<ByteBuffer> variables)
+ public Composite createClusteringPrefix(QueryOptions options)
throws InvalidRequestException
{
// If the only updated/deleted columns are static, then we don't need clustering columns.
@@ -353,10 +353,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
}
- return createClusteringPrefixBuilderInternal(variables);
+ return createClusteringPrefixBuilderInternal(options);
}
- private Composite createClusteringPrefixBuilderInternal(List<ByteBuffer> variables)
+ private Composite createClusteringPrefixBuilderInternal(QueryOptions options)
throws InvalidRequestException
{
CBuilder builder = cfm.comparator.prefixBuilder();
@@ -376,7 +376,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
else
{
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
assert values.size() == 1; // We only allow IN for row keys so far
ByteBuffer val = values.get(0);
if (val == null)
@@ -488,7 +488,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
cl.validateForWrite(cfm.ksName);
- Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp());
+ Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
if (!mutations.isEmpty())
StorageProxy.mutateWithTriggers(mutations, cl, false);
@@ -498,18 +498,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options)
throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> variables = options.getValues();
- List<ByteBuffer> keys = buildPartitionKeyNames(variables);
+ List<ByteBuffer> keys = buildPartitionKeyNames(options);
// We don't support IN for CAS operation so far
if (keys.size() > 1)
throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
ByteBuffer key = keys.get(0);
- CQL3CasConditions conditions = new CQL3CasConditions(cfm, queryState.getTimestamp());
- Composite prefix = createClusteringPrefix(variables);
+ long now = options.getTimestamp(queryState);
+ CQL3CasConditions conditions = new CQL3CasConditions(cfm, now);
+ Composite prefix = createClusteringPrefix(options);
ColumnFamily updates = ArrayBackedSortedColumns.factory.create(cfm);
- addUpdatesAndConditions(key, prefix, updates, conditions, variables, getTimestamp(queryState.getTimestamp(), variables));
+ addUpdatesAndConditions(key, prefix, updates, conditions, options, getTimestamp(now, options));
ColumnFamily result = StorageProxy.cas(keyspace(),
columnFamily(),
@@ -521,10 +521,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return new ResultMessage.Rows(buildCasResultSet(key, result));
}
- public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, List<ByteBuffer> variables, long now)
+ public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, QueryOptions options, long now)
throws InvalidRequestException
{
- UpdateParameters updParams = new UpdateParameters(cfm, variables, now, getTimeToLive(variables), null);
+ UpdateParameters updParams = new UpdateParameters(cfm, options, now, getTimeToLive(options), null);
addUpdateForKey(updates, key, clusteringPrefix, updParams);
if (ifNotExists)
@@ -541,9 +541,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
{
if (columnConditions != null)
- conditions.addConditions(clusteringPrefix, columnConditions, variables);
+ conditions.addConditions(clusteringPrefix, columnConditions, options);
if (staticConditions != null)
- conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, variables);
+ conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, options);
}
}
@@ -614,7 +614,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
long now = System.currentTimeMillis();
Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
- SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, Collections.<ByteBuffer>emptyList(), now, builder);
+ SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, QueryOptions.DEFAULT, now, builder);
return builder.build();
}
@@ -624,7 +624,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+ for (IMutation mutation : getMutations(QueryOptions.DEFAULT, true, queryState.getTimestamp()))
{
// We don't use counters internally.
assert mutation instanceof Mutation;
@@ -636,7 +636,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
/**
* Convert statement into a list of mutations to apply on the server
*
- * @param variables value for prepared statement markers
+ * @param options value for prepared statement markers
* @param local if true, any requests (for collections) performed by getMutation should be done locally only.
* @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations)
* @param now the current timestamp in microseconds to use if no timestamp is user provided.
@@ -644,13 +644,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+ private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> keys = buildPartitionKeyNames(variables);
- Composite clusteringPrefix = createClusteringPrefix(variables);
+ List<ByteBuffer> keys = buildPartitionKeyNames(options);
+ Composite clusteringPrefix = createClusteringPrefix(options);
- UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+ UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now);
Collection<IMutation> mutations = new ArrayList<IMutation>();
for (ByteBuffer key: keys)
@@ -659,22 +659,21 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
addUpdateForKey(cf, key, clusteringPrefix, params);
Mutation mut = new Mutation(cfm.ksName, key, cf);
- mutations.add(isCounter() ? new CounterMutation(mut, cl) : mut);
+ mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
}
return mutations;
}
public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
Composite prefix,
- List<ByteBuffer> variables,
+ QueryOptions options,
boolean local,
- ConsistencyLevel cl,
long now)
throws RequestExecutionException, RequestValidationException
{
// Some lists operation requires reading
- Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, cl);
- return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+ Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, options.getConsistency());
+ return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), rows);
}
public static abstract class Parsed extends CFStatement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Restriction.java b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
index 6b7eca7..4fd02c1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
@@ -42,7 +42,7 @@ public interface Restriction
public boolean isContains();
// Not supported by Slice, but it's convenient to have here
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException;
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException;
public static class EQ implements Restriction
{
@@ -55,9 +55,9 @@ public interface Restriction
this.onToken = onToken;
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
- return Collections.singletonList(value.bindAndGet(variables));
+ return Collections.singletonList(value.bindAndGet(options));
}
public boolean isSlice()
@@ -145,11 +145,11 @@ public interface Restriction
this.values = values;
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
for (Term value : values)
- buffers.add(value.bindAndGet(variables));
+ buffers.add(value.bindAndGet(options));
return buffers;
}
@@ -174,9 +174,9 @@ public interface Restriction
this.marker = marker;
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
- Lists.Value lval = marker.bind(variables);
+ Lists.Value lval = marker.bind(options);
if (lval == null)
throw new InvalidRequestException("Invalid null value for IN restriction");
return lval.elements;
@@ -234,7 +234,7 @@ public interface Restriction
return false;
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
throw new UnsupportedOperationException();
}
@@ -249,9 +249,9 @@ public interface Restriction
return bounds[b.idx] != null;
}
- public ByteBuffer bound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+ public ByteBuffer bound(Bound b, QueryOptions options) throws InvalidRequestException
{
- return bounds[b.idx].bindAndGet(variables);
+ return bounds[b.idx].bindAndGet(options);
}
public boolean isInclusive(Bound b)
@@ -379,25 +379,25 @@ public interface Restriction
keys.add(t);
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
if (values == null)
return Collections.emptyList();
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
for (Term value : values)
- buffers.add(value.bindAndGet(variables));
+ buffers.add(value.bindAndGet(options));
return buffers;
}
- public List<ByteBuffer> keys(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> keys(QueryOptions options) throws InvalidRequestException
{
if (keys == null)
return Collections.emptyList();
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(keys.size());
for (Term value : keys)
- buffers.add(value.bindAndGet(variables));
+ buffers.add(value.bindAndGet(options));
return buffers;
}
[2/5] Native protocol v3
Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index d79bd5b..b9ccd1a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -185,23 +185,22 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
ConsistencyLevel cl = options.getConsistency();
- List<ByteBuffer> variables = options.getValues();
if (cl == null)
throw new InvalidRequestException("Invalid empty consistency level");
cl.validateForRead(keyspace());
- int limit = getLimit(variables);
+ int limit = getLimit(options);
int limitForQuery = updateLimitForQuery(limit);
long now = System.currentTimeMillis();
Pageable command;
if (isKeyRange || usesSecondaryIndexing)
{
- command = getRangeCommand(variables, limitForQuery, now);
+ command = getRangeCommand(options, limitForQuery, now);
}
else
{
- List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
+ List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
command = commands == null ? null : new Pageable.ReadCommands(commands);
}
@@ -214,13 +213,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
{
- return execute(command, cl, variables, limit, now);
+ return execute(command, options, limit, now);
}
else
{
QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState());
if (parameters.isCount)
- return pageCountQuery(pager, variables, pageSize, now, limit);
+ return pageCountQuery(pager, options, pageSize, now, limit);
// We can't properly do post-query ordering if we page (see #6722)
if (needsPostQueryOrdering())
@@ -228,14 +227,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
+ "ORDER BY or the IN and sort client side, or disable paging for this query");
List<Row> page = pager.fetchPage(pageSize);
- ResultMessage.Rows msg = processResults(page, variables, limit, now);
+ ResultMessage.Rows msg = processResults(page, options, limit, now);
if (!pager.isExhausted())
msg.result.metadata.setHasMorePages(pager.state());
return msg;
}
}
- private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException, RequestExecutionException
+ private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now) throws RequestValidationException, RequestExecutionException
{
List<Row> rows;
if (command == null)
@@ -245,21 +244,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
else
{
rows = command instanceof Pageable.ReadCommands
- ? StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
- : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
+ ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency())
+ : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency());
}
- return processResults(rows, variables, limit, now);
+ return processResults(rows, options, limit, now);
}
- private ResultMessage.Rows pageCountQuery(QueryPager pager, List<ByteBuffer> variables, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException
+ private ResultMessage.Rows pageCountQuery(QueryPager pager, QueryOptions options, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException
{
int count = 0;
while (!pager.isExhausted())
{
int maxLimit = pager.maxRemaining();
logger.debug("New maxLimit for paged count query is {}", maxLimit);
- ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, now);
+ ResultSet rset = process(pager.fetchPage(pageSize), options, maxLimit, now);
count += rset.rows.size();
}
@@ -269,10 +268,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return new ResultMessage.Rows(result);
}
- public ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+ public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException
{
// Even for count, we need to process the result as it'll group some column together in sparse column families
- ResultSet rset = process(rows, variables, limit, now);
+ ResultSet rset = process(rows, options, limit, now);
rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset;
return new ResultMessage.Rows(rset);
}
@@ -288,29 +287,30 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> variables = Collections.emptyList();
- int limit = getLimit(variables);
+ QueryOptions options = QueryOptions.DEFAULT;
+ int limit = getLimit(options);
int limitForQuery = updateLimitForQuery(limit);
long now = System.currentTimeMillis();
List<Row> rows;
if (isKeyRange || usesSecondaryIndexing)
{
- RangeSliceCommand command = getRangeCommand(variables, limitForQuery, now);
+ RangeSliceCommand command = getRangeCommand(options, limitForQuery, now);
rows = command == null ? Collections.<Row>emptyList() : command.executeLocally();
}
else
{
- List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
+ List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
rows = commands == null ? Collections.<Row>emptyList() : readLocally(keyspace(), commands);
}
- return processResults(rows, variables, limit, now);
+ return processResults(rows, options, limit, now);
}
public ResultSet process(List<Row> rows) throws InvalidRequestException
{
assert !parameters.isCount; // not yet needed
- return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()), System.currentTimeMillis());
+ QueryOptions options = QueryOptions.DEFAULT;
+ return process(rows, options, getLimit(options), System.currentTimeMillis());
}
public String keyspace()
@@ -323,15 +323,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return cfm.cfName;
}
- private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+ private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException
{
- Collection<ByteBuffer> keys = getKeys(variables);
+ Collection<ByteBuffer> keys = getKeys(options);
if (keys.isEmpty()) // in case of IN () for (the last column of) the partition key.
return null;
List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
- IDiskAtomFilter filter = makeFilter(variables, limit);
+ IDiskAtomFilter filter = makeFilter(options, limit);
if (filter == null)
return null;
@@ -349,29 +349,29 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return commands;
}
- private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+ private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException
{
- IDiskAtomFilter filter = makeFilter(variables, limit);
+ IDiskAtomFilter filter = makeFilter(options, limit);
if (filter == null)
return null;
- List<IndexExpression> expressions = getIndexExpressions(variables);
+ List<IndexExpression> expressions = getIndexExpressions(options);
// The LIMIT provided by the user is the number of CQL row he wants returned.
// We want to have getRangeSlice to count the number of columns, not the number of keys.
- AbstractBounds<RowPosition> keyBounds = getKeyBounds(variables);
+ AbstractBounds<RowPosition> keyBounds = getKeyBounds(options);
return keyBounds == null
? null
: new RangeSliceCommand(keyspace(), columnFamily(), now, filter, keyBounds, expressions, limit, !parameters.isDistinct, false);
}
- private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables) throws InvalidRequestException
+ private AbstractBounds<RowPosition> getKeyBounds(QueryOptions options) throws InvalidRequestException
{
IPartitioner<?> p = StorageService.getPartitioner();
if (onToken)
{
- Token startToken = getTokenBound(Bound.START, variables, p);
- Token endToken = getTokenBound(Bound.END, variables, p);
+ Token startToken = getTokenBound(Bound.START, options, p);
+ Token endToken = getTokenBound(Bound.END, options, p);
boolean includeStart = includeKeyBound(Bound.START);
boolean includeEnd = includeKeyBound(Bound.END);
@@ -397,8 +397,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables);
- ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables);
+ ByteBuffer startKeyBytes = getKeyBound(Bound.START, options);
+ ByteBuffer finishKeyBytes = getKeyBound(Bound.END, options);
RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
@@ -421,7 +421,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
}
- private IDiskAtomFilter makeFilter(List<ByteBuffer> variables, int limit)
+ private IDiskAtomFilter makeFilter(QueryOptions options, int limit)
throws InvalidRequestException
{
if (parameters.isDistinct)
@@ -431,8 +431,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
else if (isColumnRange())
{
int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size();
- List<Composite> startBounds = getRequestedBound(Bound.START, variables);
- List<Composite> endBounds = getRequestedBound(Bound.END, variables);
+ List<Composite> startBounds = getRequestedBound(Bound.START, options);
+ List<Composite> endBounds = getRequestedBound(Bound.END, options);
assert startBounds.size() == endBounds.size();
// Handles fetching static columns. Note that for 2i, the filter is just used to restrict
@@ -516,7 +516,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- SortedSet<CellName> cellNames = getRequestedColumns(variables);
+ SortedSet<CellName> cellNames = getRequestedColumns(options);
if (cellNames == null) // in case of IN () for the last column of the key
return null;
QueryProcessor.validateCellNames(cellNames, cfm.comparator);
@@ -534,12 +534,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return new SliceQueryFilter(slices, isReversed, limit, toGroup);
}
- private int getLimit(List<ByteBuffer> variables) throws InvalidRequestException
+ private int getLimit(QueryOptions options) throws InvalidRequestException
{
int l = Integer.MAX_VALUE;
if (limit != null)
{
- ByteBuffer b = limit.bindAndGet(variables);
+ ByteBuffer b = limit.bindAndGet(options);
if (b == null)
throw new InvalidRequestException("Invalid null value of limit");
@@ -569,7 +569,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
: limit;
}
- private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
+ private Collection<ByteBuffer> getKeys(final QueryOptions options) throws InvalidRequestException
{
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
CBuilder builder = cfm.getKeyValidatorAsCType().builder();
@@ -578,7 +578,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
Restriction r = keyRestrictions[def.position()];
assert r != null && !r.isSlice();
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
if (builder.remainingCount() == 1)
{
@@ -603,7 +603,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return keys;
}
- private ByteBuffer getKeyBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+ private ByteBuffer getKeyBound(Bound b, QueryOptions options) throws InvalidRequestException
{
// Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the first
// component of a composite partition key).
@@ -612,10 +612,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return ByteBufferUtil.EMPTY_BYTE_BUFFER;
// We deal with IN queries for keys in other places, so we know buildBound will return only one result
- return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), variables).get(0).toByteBuffer();
+ return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), options).get(0).toByteBuffer();
}
- private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
+ private Token getTokenBound(Bound b, QueryOptions options, IPartitioner<?> p) throws InvalidRequestException
{
assert onToken;
@@ -623,7 +623,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
ByteBuffer value;
if (keyRestriction.isEQ())
{
- value = keyRestriction.values(variables).get(0);
+ value = keyRestriction.values(options).get(0);
}
else
{
@@ -631,7 +631,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (!slice.hasBound(b))
return p.getMinimumToken();
- value = slice.bound(b, variables);
+ value = slice.bound(b, options);
}
if (value == null)
@@ -669,7 +669,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return false;
}
- private SortedSet<CellName> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
+ private SortedSet<CellName> getRequestedColumns(QueryOptions options) throws InvalidRequestException
{
// Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762
// we always do a slice for CQL3 tables, so it's ok to ignore them here
@@ -682,7 +682,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
ColumnDefinition def = idIter.next();
assert r != null && !r.isSlice();
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
if (values.size() == 1)
{
ByteBuffer val = values.get(0);
@@ -772,7 +772,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
Restriction[] restrictions,
boolean isReversed,
CType type,
- List<ByteBuffer> variables) throws InvalidRequestException
+ QueryOptions options) throws InvalidRequestException
{
CBuilder builder = type.builder();
@@ -801,7 +801,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (r.isSlice())
{
- builder.add(getSliceValue(def, r, b, variables));
+ builder.add(getSliceValue(def, r, b, options));
Relation.Type relType = ((Restriction.Slice)r).getRelation(eocBound, b);
// We can have more non null restriction if the "scalar" notation was used for the bound (#4851).
@@ -813,13 +813,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (isNullRestriction(r, b))
break;
- builder.add(getSliceValue(def, r, b, variables));
+ builder.add(getSliceValue(def, r, b, options));
}
return Collections.singletonList(builder.build().withEOC(eocForRelation(relType)));
}
else
{
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
if (values.size() != 1)
{
// IN query, we only support it on the clustering column
@@ -878,23 +878,23 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return r == null || (r.isSlice() && !((Restriction.Slice)r).hasBound(b));
}
- private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+ private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, QueryOptions options) throws InvalidRequestException
{
Restriction.Slice slice = (Restriction.Slice)r;
assert slice.hasBound(b);
- ByteBuffer val = slice.bound(b, variables);
+ ByteBuffer val = slice.bound(b, options);
if (val == null)
throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
return val;
}
- private List<Composite> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+ private List<Composite> getRequestedBound(Bound b, QueryOptions options) throws InvalidRequestException
{
assert isColumnRange();
- return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, variables);
+ return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, options);
}
- public List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<IndexExpression> getIndexExpressions(QueryOptions options) throws InvalidRequestException
{
if (!usesSecondaryIndexing || restrictedColumns.isEmpty())
return Collections.emptyList();
@@ -927,7 +927,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
{
if (slice.hasBound(b))
{
- ByteBuffer value = validateIndexedValue(def, slice.bound(b, variables));
+ ByteBuffer value = validateIndexedValue(def, slice.bound(b, options));
expressions.add(new IndexExpression(def.name.bytes, slice.getIndexOperator(b), value));
}
}
@@ -935,12 +935,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
else if (restriction.isContains())
{
Restriction.Contains contains = (Restriction.Contains)restriction;
- for (ByteBuffer value : contains.values(variables))
+ for (ByteBuffer value : contains.values(options))
{
validateIndexedValue(def, value);
expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS, value));
}
- for (ByteBuffer key : contains.keys(variables))
+ for (ByteBuffer key : contains.keys(options))
{
validateIndexedValue(def, key);
expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS_KEY, key));
@@ -948,7 +948,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- List<ByteBuffer> values = restriction.values(variables);
+ List<ByteBuffer> values = restriction.values(options);
if (values.size() != 1)
throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
@@ -969,13 +969,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return value;
}
- private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final List<ByteBuffer> variables) throws InvalidRequestException
+ private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final QueryOptions options) throws InvalidRequestException
{
assert sliceRestriction != null;
final CellNameType type = cfm.comparator;
- final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, variables));
- final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, variables));
+ final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, options));
+ final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, options));
return new AbstractIterator<Cell>()
{
@@ -998,7 +998,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
};
}
- private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws InvalidRequestException
+ private ResultSet process(List<Row> rows, QueryOptions options, int limit, long now) throws InvalidRequestException
{
Selection.ResultSetBuilder result = selection.resultSetBuilder(now);
for (org.apache.cassandra.db.Row row : rows)
@@ -1007,12 +1007,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (row.cf == null)
continue;
- processColumnFamily(row.key.getKey(), row.cf, variables, now, result);
+ processColumnFamily(row.key.getKey(), row.cf, options, now, result);
}
ResultSet cqlRows = result.build();
- orderResults(cqlRows, variables);
+ orderResults(cqlRows);
// Internal calls always return columns in the comparator order, even when reverse was set
if (isReversed)
@@ -1024,7 +1024,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
// Used by ModificationStatement for CAS operations
- void processColumnFamily(ByteBuffer key, ColumnFamily cf, List<ByteBuffer> variables, long now, Selection.ResultSetBuilder result)
+ void processColumnFamily(ByteBuffer key, ColumnFamily cf, QueryOptions options, long now, Selection.ResultSetBuilder result)
throws InvalidRequestException
{
CFMetaData cfm = cf.metadata();
@@ -1040,7 +1040,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
Iterator<Cell> cells = cf.getSortedColumns().iterator();
if (sliceRestriction != null)
- cells = applySliceRestriction(cells, variables);
+ cells = applySliceRestriction(cells, options);
CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
@@ -1059,7 +1059,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
result.add(keyComponents[def.position()]);
break;
case STATIC:
- addValue(result, def, staticRow);
+ addValue(result, def, staticRow, options);
break;
default:
result.add((ByteBuffer)null);
@@ -1089,17 +1089,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
result.add(cql3Row.getColumn(null));
break;
case REGULAR:
- addValue(result, def, cql3Row);
+ addValue(result, def, cql3Row, options);
break;
case STATIC:
- addValue(result, def, staticRow);
+ addValue(result, def, staticRow, options);
break;
}
}
}
}
- private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row)
+ private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row, QueryOptions options)
{
if (row == null)
{
@@ -1112,7 +1112,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
List<Cell> collection = row.getCollection(def.name);
ByteBuffer value = collection == null
? null
- : ((CollectionType)def.type).serialize(collection);
+ : ((CollectionType)def.type).serializeForNativeProtocol(collection, options.getProtocolVersion());
result.add(value);
return;
}
@@ -1137,7 +1137,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
/**
* Orders results when multiple keys are selected (using IN)
*/
- private void orderResults(ResultSet cqlRows, List<ByteBuffer> variables) throws InvalidRequestException
+ private void orderResults(ResultSet cqlRows) throws InvalidRequestException
{
if (cqlRows.size() == 0 || !needsPostQueryOrdering())
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 6f9a270..3902e05 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -362,7 +362,7 @@ public class DefsTables
dropType(type);
for (MapDifference.ValueDifference<UserType> tdiff : typesDiff.entriesDiffering().values())
- addType(tdiff.rightValue()); // use the most recent value
+ updateType(tdiff.rightValue()); // use the most recent value
}
}
}
@@ -412,7 +412,7 @@ public class DefsTables
ksm.userTypes.addType(ut);
if (!StorageService.instance.isClientMode())
- MigrationManager.instance.notifyUpdateKeyspace(ksm);
+ MigrationManager.instance.notifyCreateUserType(ut);
}
private static void updateKeyspace(KSMetaData newState)
@@ -444,6 +444,19 @@ public class DefsTables
}
}
+ private static void updateType(UserType ut)
+ {
+ KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace);
+ assert ksm != null;
+
+ logger.info("Updating {}", ut);
+
+ ksm.userTypes.addType(ut);
+
+ if (!StorageService.instance.isClientMode())
+ MigrationManager.instance.notifyUpdateUserType(ut);
+ }
+
private static void dropKeyspace(String ksName)
{
KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
@@ -515,7 +528,7 @@ public class DefsTables
ksm.userTypes.removeType(ut);
if (!StorageService.instance.isClientMode())
- MigrationManager.instance.notifyUpdateKeyspace(ksm);
+ MigrationManager.instance.notifyUpdateUserType(ut);
}
private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 5db4ba0..7f75a5f 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -57,7 +58,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
protected abstract void appendToStringBuilder(StringBuilder sb);
- public abstract ByteBuffer serialize(List<Cell> cells);
+ public abstract List<ByteBuffer> serializedValues(List<Cell> cells);
@Override
public String toString()
@@ -110,22 +111,9 @@ public abstract class CollectionType<T> extends AbstractType<T>
return true;
}
- // Utilitary method
- protected static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int size)
+ protected List<Cell> enforceLimit(List<Cell> cells, int version)
{
- ByteBuffer result = ByteBuffer.allocate(2 + size);
- result.putShort((short)elements);
- for (ByteBuffer bb : buffers)
- {
- result.putShort((short)bb.remaining());
- result.put(bb.duplicate());
- }
- return (ByteBuffer)result.flip();
- }
-
- protected List<Cell> enforceLimit(List<Cell> cells)
- {
- if (cells.size() <= MAX_ELEMENTS)
+ if (version >= 3 || cells.size() <= MAX_ELEMENTS)
return cells;
logger.error("Detected collection with {} elements, more than the {} limit. Only the first {} elements will be returned to the client. "
@@ -133,12 +121,11 @@ public abstract class CollectionType<T> extends AbstractType<T>
return cells.subList(0, MAX_ELEMENTS);
}
- public static ByteBuffer pack(List<ByteBuffer> buffers, int elements)
+ public ByteBuffer serializeForNativeProtocol(List<Cell> cells, int version)
{
- int size = 0;
- for (ByteBuffer bb : buffers)
- size += 2 + bb.remaining();
- return pack(buffers, elements, size);
+ cells = enforceLimit(cells, version);
+ List<ByteBuffer> values = serializedValues(cells);
+ return CollectionSerializer.pack(values, cells.size(), version);
}
public CQL3Type asCQL3Type()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 43ace65..6e6821b 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -72,7 +72,7 @@ public class ListType<T> extends CollectionType<List<T>>
return elements;
}
- public TypeSerializer<List<T>> getSerializer()
+ public ListSerializer<T> getSerializer()
{
return serializer;
}
@@ -112,17 +112,11 @@ public class ListType<T> extends CollectionType<List<T>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
}
- public ByteBuffer serialize(List<Cell> cells)
+ public List<ByteBuffer> serializedValues(List<Cell> cells)
{
- cells = enforceLimit(cells);
-
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
- int size = 0;
for (Cell c : cells)
- {
bbs.add(c.value());
- size += 2 + c.value().remaining();
- }
- return pack(bbs, cells.size(), size);
+ return bbs;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 213e213..71023a7 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -108,7 +108,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
}
@Override
- public TypeSerializer<Map<K, V>> getSerializer()
+ public MapSerializer<K, V> getSerializer()
{
return serializer;
}
@@ -123,23 +123,14 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Arrays.asList(keys, values)));
}
- /**
- * Creates the same output than serialize, but from the internal representation.
- */
- public ByteBuffer serialize(List<Cell> cells)
+ public List<ByteBuffer> serializedValues(List<Cell> cells)
{
- cells = enforceLimit(cells);
-
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * cells.size());
- int size = 0;
+ List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size() * 2);
for (Cell c : cells)
{
- ByteBuffer key = c.name().collectionElement();
- ByteBuffer value = c.value();
- bbs.add(key);
- bbs.add(value);
- size += 4 + key.remaining() + value.remaining();
+ bbs.add(c.name().collectionElement());
+ bbs.add(c.value());
}
- return pack(bbs, cells.size(), size);
+ return bbs;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 3b686b8..d2f7f12 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -77,7 +77,7 @@ public class SetType<T> extends CollectionType<Set<T>>
return ListType.compareListOrSet(elements, o1, o2);
}
- public TypeSerializer<Set<T>> getSerializer()
+ public SetSerializer<T> getSerializer()
{
return serializer;
}
@@ -92,18 +92,11 @@ public class SetType<T> extends CollectionType<Set<T>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
}
- public ByteBuffer serialize(List<Cell> cells)
+ public List<ByteBuffer> serializedValues(List<Cell> cells)
{
- cells = enforceLimit(cells);
-
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
- int size = 0;
for (Cell c : cells)
- {
- ByteBuffer key = c.name().collectionElement();
- bbs.add(key);
- size += 2 + key.remaining();
- }
- return pack(bbs, cells.size(), size);
+ bbs.add(c.name().collectionElement());
+ return bbs;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java
index eb95fb9..973a5be 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -64,6 +64,11 @@ public class UserType extends CompositeType
return new UserType(keyspace, name, columnNames, columnTypes);
}
+ public String getNameAsString()
+ {
+ return UTF8Type.instance.compose(name);
+ }
+
@Override
public final int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index af88853..9e3abcf 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -431,8 +432,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
{
ByteBuffer buffer = objToBB(sub);
serialized.add(buffer);
- }
- return CollectionType.pack(serialized, objects.size());
+ }
+ // NOTE: using protocol v1 serialization format for collections so as to not break
+ // compatibility. Not sure if that's the right thing.
+ return CollectionSerializer.pack(serialized, objects.size(), 1);
}
private ByteBuffer objToMapBB(List<Object> objects)
@@ -447,7 +450,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
serialized.add(buffer);
}
}
- return CollectionType.pack(serialized, objects.size());
+ // NOTE: using protocol v1 serialization format for collections so as to not break
+ // compatibility. Not sure if that's the right thing.
+ return CollectionSerializer.pack(serialized, objects.size(), 1);
}
private ByteBuffer objToCompositeBB(List<Object> objects)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 9b7a8e7..6993b19 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -195,14 +195,15 @@ public class CQLSSTableWriter
if (values.size() != boundNames.size())
throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size()));
- List<ByteBuffer> keys = insert.buildPartitionKeyNames(values);
- Composite clusteringPrefix = insert.createClusteringPrefix(values);
+ QueryOptions options = QueryOptions.forInternalCalls(null, values);
+ List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
+ Composite clusteringPrefix = insert.createClusteringPrefix(options);
long now = System.currentTimeMillis() * 1000;
UpdateParameters params = new UpdateParameters(insert.cfm,
- values,
- insert.getTimestamp(now, values),
- insert.getTimeToLive(values),
+ options,
+ insert.getTimestamp(now, options),
+ insert.getTimeToLive(options),
Collections.<ByteBuffer, CQL3Row>emptyMap());
for (ByteBuffer key: keys)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 83a391d..0e16fda 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.serializers;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
public abstract class CollectionSerializer<T> implements TypeSerializer<T>
{
public void validate(ByteBuffer bytes) throws MarshalException
@@ -28,24 +30,104 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
// The collection is not currently being properly validated.
}
- // Utilitary method
- protected static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int size)
+ protected abstract List<ByteBuffer> serializeValues(T value);
+ protected abstract int getElementCount(T value);
+
+ public abstract T deserializeForNativeProtocol(ByteBuffer buffer, int version);
+
+ public ByteBuffer serialize(T value)
+ {
+ List<ByteBuffer> values = serializeValues(value);
+ // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake),
+ // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible.
+ return pack(values, getElementCount(value), 3);
+ }
+
+ public T deserialize(ByteBuffer bytes)
{
- ByteBuffer result = ByteBuffer.allocate(2 + size);
- result.putShort((short)elements);
+ // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake),
+ // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible.
+ return deserializeForNativeProtocol(bytes, 3);
+ }
+
+ public static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int version)
+ {
+ int size = 0;
+ for (ByteBuffer bb : buffers)
+ size += sizeOfValue(bb, version);
+
+ ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize(elements, version) + size);
+ writeCollectionSize(result, elements, version);
for (ByteBuffer bb : buffers)
+ writeValue(result, bb, version);
+ return (ByteBuffer)result.flip();
+ }
+
+ protected static void writeCollectionSize(ByteBuffer output, int elements, int version)
+ {
+ if (version >= 3)
+ output.putInt(elements);
+ else
+ output.putShort((short)elements);
+ }
+
+ protected static int readCollectionSize(ByteBuffer input, int version)
+ {
+ return version >= 3 ? input.getInt() : ByteBufferUtil.readShortLength(input);
+ }
+
+ protected static int sizeOfCollectionSize(int elements, int version)
+ {
+ return version >= 3 ? 4 : 2;
+ }
+
+ protected static void writeValue(ByteBuffer output, ByteBuffer value, int version)
+ {
+ if (version >= 3)
{
- result.putShort((short)bb.remaining());
- result.put(bb.duplicate());
+ if (value == null)
+ {
+ output.putInt(-1);
+ return;
+ }
+
+ output.putInt(value.remaining());
+ output.put(value.duplicate());
+ }
+ else
+ {
+ assert value != null;
+ output.putShort((short)value.remaining());
+ output.put(value.duplicate());
}
- return (ByteBuffer)result.flip();
}
- public static ByteBuffer pack(List<ByteBuffer> buffers, int elements)
+ protected static ByteBuffer readValue(ByteBuffer input, int version)
{
- int size = 0;
- for (ByteBuffer bb : buffers)
- size += 2 + bb.remaining();
- return pack(buffers, elements, size);
+ if (version >= 3)
+ {
+ int size = input.getInt();
+ if (size < 0)
+ return null;
+
+ return ByteBufferUtil.readBytes(input, size);
+ }
+ else
+ {
+ return ByteBufferUtil.readBytesWithShortLength(input);
+ }
+ }
+
+ protected static int sizeOfValue(ByteBuffer value, int version)
+ {
+ if (version >= 3)
+ {
+ return value == null ? 4 : 4 + value.remaining();
+ }
+ else
+ {
+ assert value != null;
+ return 2 + value.remaining();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/ListSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java
index 59f25d2..e662341 100644
--- a/src/java/org/apache/cassandra/serializers/ListSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java
@@ -47,16 +47,29 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
this.elements = elements;
}
- public List<T> deserialize(ByteBuffer bytes)
+ public List<ByteBuffer> serializeValues(List<T> values)
+ {
+ List<ByteBuffer> buffers = new ArrayList<>(values.size());
+ for (T value : values)
+ buffers.add(elements.serialize(value));
+ return buffers;
+ }
+
+ public int getElementCount(List<T> value)
+ {
+ return value.size();
+ }
+
+ public List<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
{
try
{
ByteBuffer input = bytes.duplicate();
- int n = ByteBufferUtil.readShortLength(input);
+ int n = readCollectionSize(input, version);
List<T> l = new ArrayList<T>(n);
for (int i = 0; i < n; i++)
{
- ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input);
+ ByteBuffer databb = readValue(input, version);
elements.validate(databb);
l.add(elements.deserialize(databb));
}
@@ -68,26 +81,6 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
}
}
- /**
- * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
- * where:
- * n is the number of elements
- * s_i is the number of bytes composing the ith element
- * b_i is the s_i bytes composing the ith element
- */
- public ByteBuffer serialize(List<T> value)
- {
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
- int size = 0;
- for (T elt : value)
- {
- ByteBuffer bb = elements.serialize(elt);
- bbs.add(bb);
- size += 2 + bb.remaining();
- }
- return pack(bbs, value.size(), size);
- }
-
public String toString(List<T> value)
{
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/MapSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java
index f79d07f..5d349dd 100644
--- a/src/java/org/apache/cassandra/serializers/MapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java
@@ -51,19 +51,35 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
this.values = values;
}
- public Map<K, V> deserialize(ByteBuffer bytes)
+ public List<ByteBuffer> serializeValues(Map<K, V> map)
+ {
+ List<ByteBuffer> buffers = new ArrayList<>(map.size() * 2);
+ for (Map.Entry<K, V> entry : map.entrySet())
+ {
+ buffers.add(keys.serialize(entry.getKey()));
+ buffers.add(values.serialize(entry.getValue()));
+ }
+ return buffers;
+ }
+
+ public int getElementCount(Map<K, V> value)
+ {
+ return value.size();
+ }
+
+ public Map<K, V> deserializeForNativeProtocol(ByteBuffer bytes, int version)
{
try
{
ByteBuffer input = bytes.duplicate();
- int n = ByteBufferUtil.readShortLength(input);
+ int n = readCollectionSize(input, version);
Map<K, V> m = new LinkedHashMap<K, V>(n);
for (int i = 0; i < n; i++)
{
- ByteBuffer kbb = ByteBufferUtil.readBytesWithShortLength(input);
+ ByteBuffer kbb = readValue(input, version);
keys.validate(kbb);
- ByteBuffer vbb = ByteBufferUtil.readBytesWithShortLength(input);
+ ByteBuffer vbb = readValue(input, version);
values.validate(vbb);
m.put(keys.deserialize(kbb), values.deserialize(vbb));
@@ -76,30 +92,6 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
}
}
- /**
- * Layout is: {@code <n><sk_1><k_1><sv_1><v_1>...<sk_n><k_n><sv_n><v_n> }
- * where:
- * n is the number of elements
- * sk_i is the number of bytes composing the ith key k_i
- * k_i is the sk_i bytes composing the ith key
- * sv_i is the number of bytes composing the ith value v_i
- * v_i is the sv_i bytes composing the ith value
- */
- public ByteBuffer serialize(Map<K, V> value)
- {
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * value.size());
- int size = 0;
- for (Map.Entry<K, V> entry : value.entrySet())
- {
- ByteBuffer bbk = keys.serialize(entry.getKey());
- ByteBuffer bbv = values.serialize(entry.getValue());
- bbs.add(bbk);
- bbs.add(bbv);
- size += 4 + bbk.remaining() + bbv.remaining();
- }
- return pack(bbs, value.size(), size);
- }
-
public String toString(Map<K, V> value)
{
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/SetSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java
index d6d7062..812dd68 100644
--- a/src/java/org/apache/cassandra/serializers/SetSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java
@@ -47,16 +47,29 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
this.elements = elements;
}
- public Set<T> deserialize(ByteBuffer bytes)
+ public List<ByteBuffer> serializeValues(Set<T> values)
+ {
+ List<ByteBuffer> buffers = new ArrayList<>(values.size());
+ for (T value : values)
+ buffers.add(elements.serialize(value));
+ return buffers;
+ }
+
+ public int getElementCount(Set<T> value)
+ {
+ return value.size();
+ }
+
+ public Set<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
{
try
{
ByteBuffer input = bytes.duplicate();
- int n = ByteBufferUtil.readShortLength(input);
+ int n = readCollectionSize(input, version);
Set<T> l = new LinkedHashSet<T>(n);
for (int i = 0; i < n; i++)
{
- ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input);
+ ByteBuffer databb = readValue(input, version);
elements.validate(databb);
l.add(elements.deserialize(databb));
}
@@ -68,26 +81,6 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
}
}
- /**
- * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
- * where:
- * n is the number of elements
- * s_i is the number of bytes composing the ith element
- * b_i is the s_i bytes composing the ith element
- */
- public ByteBuffer serialize(Set<T> value)
- {
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
- int size = 0;
- for (T elt : value)
- {
- ByteBuffer bb = elements.serialize(elt);
- bbs.add(bb);
- size += 2 + bb.remaining();
- }
- return pack(bbs, value.size(), size);
- }
-
public String toString(Set<T> value)
{
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
index e16ac62..4d142bd 100644
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ b/src/java/org/apache/cassandra/service/IMigrationListener.java
@@ -21,10 +21,13 @@ public interface IMigrationListener
{
public void onCreateKeyspace(String ksName);
public void onCreateColumnFamily(String ksName, String cfName);
+ public void onCreateUserType(String ksName, String typeName);
public void onUpdateKeyspace(String ksName);
public void onUpdateColumnFamily(String ksName, String cfName);
+ public void onUpdateUserType(String ksName, String typeName);
public void onDropKeyspace(String ksName);
public void onDropColumnFamily(String ksName, String cfName);
+ public void onDropUserType(String ksName, String typeName);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 7eb7282..ec46d3f 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -167,6 +167,12 @@ public class MigrationManager
listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
}
+ public void notifyCreateUserType(UserType ut)
+ {
+ for (IMigrationListener listener : listeners)
+ listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
+ }
+
public void notifyUpdateKeyspace(KSMetaData ksm)
{
for (IMigrationListener listener : listeners)
@@ -179,6 +185,12 @@ public class MigrationManager
listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
}
+ public void notifyUpdateUserType(UserType ut)
+ {
+ for (IMigrationListener listener : listeners)
+ listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
+ }
+
public void notifyDropKeyspace(KSMetaData ksm)
{
for (IMigrationListener listener : listeners)
@@ -191,6 +203,12 @@ public class MigrationManager
listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
}
+ public void notifyDropUserType(UserType ut)
+ {
+ for (IMigrationListener listener : listeners)
+ listener.onDropUserType(ut.keyspace, ut.getNameAsString());
+ }
+
public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException
{
announceNewKeyspace(ksm, FBUtilities.timestampMicros());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index c4abe0b..3040aaf 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1969,7 +1969,7 @@ public class CassandraServer implements Cassandra.Iface
}
ThriftClientState cState = state();
- return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();
+ return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();
}
catch (RequestExecutionException e)
{
@@ -2100,7 +2100,7 @@ public class CassandraServer implements Cassandra.Iface
return cState.getCQLQueryHandler().processPrepared(statement,
cState.getQueryState(),
- new QueryOptions(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult();
+ QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult();
}
catch (RequestExecutionException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index e5222a1..36a7e71 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -36,6 +36,7 @@ import io.netty.util.CharsetUtil;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
/**
@@ -363,6 +364,22 @@ public abstract class CBUtil
return size;
}
+ public static Pair<List<String>, List<ByteBuffer>> readNameAndValueList(ByteBuf cb)
+ {
+ int size = cb.readUnsignedShort();
+ if (size == 0)
+ return Pair.create(Collections.<String>emptyList(), Collections.<ByteBuffer>emptyList());
+
+ List<String> s = new ArrayList<>(size);
+ List<ByteBuffer> l = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ {
+ s.add(readString(cb));
+ l.add(readValue(cb));
+ }
+ return Pair.create(s, l);
+ }
+
public static InetSocketAddress readInet(ByteBuf cb)
{
int addrSize = cb.readByte();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 4a50bde..989b954 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -128,7 +128,7 @@ public class Client extends SimpleClient
return null;
}
}
- return new QueryMessage(query, new QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null));
+ return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null));
}
else if (msgType.equals("PREPARE"))
{
@@ -156,7 +156,7 @@ public class Client extends SimpleClient
}
values.add(bb);
}
- return new ExecuteMessage(MD5Digest.wrap(id), new QueryOptions(ConsistencyLevel.ONE, values));
+ return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values));
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java
index f0b5d95..3cff973 100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.transport;
-import java.nio.charset.StandardCharsets;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -51,7 +51,9 @@ public enum DataType implements OptionCodec.Codecable<DataType>
INET (16, InetAddressType.instance),
LIST (32, null),
MAP (33, null),
- SET (34, null);
+ SET (34, null),
+ UDT (48, null);
+
public static final OptionCodec<DataType> codec = new OptionCodec<DataType>(DataType.class);
@@ -78,27 +80,39 @@ public enum DataType implements OptionCodec.Codecable<DataType>
return id;
}
- public Object readValue(ByteBuf cb)
+ public Object readValue(ByteBuf cb, int version)
{
switch (this)
{
case CUSTOM:
return CBUtil.readString(cb);
case LIST:
- return DataType.toType(codec.decodeOne(cb));
+ return DataType.toType(codec.decodeOne(cb, version));
case SET:
- return DataType.toType(codec.decodeOne(cb));
+ return DataType.toType(codec.decodeOne(cb, version));
case MAP:
List<AbstractType> l = new ArrayList<AbstractType>(2);
- l.add(DataType.toType(codec.decodeOne(cb)));
- l.add(DataType.toType(codec.decodeOne(cb)));
+ l.add(DataType.toType(codec.decodeOne(cb, version)));
+ l.add(DataType.toType(codec.decodeOne(cb, version)));
return l;
+ case UDT:
+ String ks = CBUtil.readString(cb);
+ ByteBuffer name = UTF8Type.instance.decompose(CBUtil.readString(cb));
+ int n = cb.readUnsignedShort();
+ List<ByteBuffer> fieldNames = new ArrayList<>(n);
+ List<AbstractType<?>> fieldTypes = new ArrayList<>(n);
+ for (int i = 0; i < n; i++)
+ {
+ fieldNames.add(UTF8Type.instance.decompose(CBUtil.readString(cb)));
+ fieldTypes.add(DataType.toType(codec.decodeOne(cb, version)));
+ }
+ return new UserType(ks, name, fieldNames, fieldTypes);
default:
return null;
}
}
- public void writeValue(Object value, ByteBuf cb)
+ public void writeValue(Object value, ByteBuf cb, int version)
{
switch (this)
{
@@ -107,40 +121,63 @@ public enum DataType implements OptionCodec.Codecable<DataType>
CBUtil.writeString((String)value, cb);
break;
case LIST:
- codec.writeOne(DataType.fromType((AbstractType)value), cb);
+ codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version);
break;
case SET:
- codec.writeOne(DataType.fromType((AbstractType)value), cb);
+ codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version);
break;
case MAP:
List<AbstractType> l = (List<AbstractType>)value;
- codec.writeOne(DataType.fromType(l.get(0)), cb);
- codec.writeOne(DataType.fromType(l.get(1)), cb);
+ codec.writeOne(DataType.fromType(l.get(0), version), cb, version);
+ codec.writeOne(DataType.fromType(l.get(1), version), cb, version);
+ break;
+ case UDT:
+ UserType udt = (UserType)value;
+ CBUtil.writeString(udt.keyspace, cb);
+ CBUtil.writeString(UTF8Type.instance.compose(udt.name), cb);
+ cb.writeShort(udt.columnNames.size());
+ for (int i = 0; i < udt.columnNames.size(); i++)
+ {
+ CBUtil.writeString(UTF8Type.instance.compose(udt.columnNames.get(i)), cb);
+ codec.writeOne(DataType.fromType(udt.types.get(i), version), cb, version);
+ }
break;
}
}
- public int serializedValueSize(Object value)
+ public int serializedValueSize(Object value, int version)
{
switch (this)
{
case CUSTOM:
- return 2 + ((String)value).getBytes(StandardCharsets.UTF_8).length;
+ return CBUtil.sizeOfString((String)value);
case LIST:
case SET:
- return codec.oneSerializedSize(DataType.fromType((AbstractType)value));
+ return codec.oneSerializedSize(DataType.fromType((AbstractType)value, version), version);
case MAP:
List<AbstractType> l = (List<AbstractType>)value;
int s = 0;
- s += codec.oneSerializedSize(DataType.fromType(l.get(0)));
- s += codec.oneSerializedSize(DataType.fromType(l.get(1)));
+ s += codec.oneSerializedSize(DataType.fromType(l.get(0), version), version);
+ s += codec.oneSerializedSize(DataType.fromType(l.get(1), version), version);
return s;
+ case UDT:
+ UserType udt = (UserType)value;
+ int size = 0;
+ size += CBUtil.sizeOfString(udt.keyspace);
+ size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.name));
+ size += 2;
+ for (int i = 0; i < udt.columnNames.size(); i++)
+ {
+ size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.columnNames.get(i)));
+ size += codec.oneSerializedSize(DataType.fromType(udt.types.get(i), version), version);
+ }
+ return size;
default:
return 0;
}
}
- public static Pair<DataType, Object> fromType(AbstractType type)
+ public static Pair<DataType, Object> fromType(AbstractType type, int version)
{
// For CQL3 clients, ReversedType is an implementation detail and they
// shouldn't have to care about it.
@@ -170,6 +207,10 @@ public enum DataType implements OptionCodec.Codecable<DataType>
return Pair.<DataType, Object>create(SET, ((SetType)type).elements);
}
}
+
+ if (type instanceof UserType && version >= 3)
+ return Pair.<DataType, Object>create(UDT, type);
+
return Pair.<DataType, Object>create(CUSTOM, type.toString());
}
else
@@ -193,6 +234,8 @@ public enum DataType implements OptionCodec.Codecable<DataType>
case MAP:
List<AbstractType> l = (List<AbstractType>)entry.right;
return MapType.getInstance(l.get(0), l.get(1));
+ case UDT:
+ return (AbstractType)entry.right;
default:
return entry.left.type;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 242ad64..7ec026e 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.transport;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
public abstract class Event
@@ -33,33 +34,33 @@ public abstract class Event
this.type = type;
}
- public static Event deserialize(ByteBuf cb)
+ public static Event deserialize(ByteBuf cb, int version)
{
switch (CBUtil.readEnumValue(Type.class, cb))
{
case TOPOLOGY_CHANGE:
- return TopologyChange.deserializeEvent(cb);
+ return TopologyChange.deserializeEvent(cb, version);
case STATUS_CHANGE:
- return StatusChange.deserializeEvent(cb);
+ return StatusChange.deserializeEvent(cb, version);
case SCHEMA_CHANGE:
- return SchemaChange.deserializeEvent(cb);
+ return SchemaChange.deserializeEvent(cb, version);
}
throw new AssertionError();
}
- public void serialize(ByteBuf dest)
+ public void serialize(ByteBuf dest, int version)
{
CBUtil.writeEnumValue(type, dest);
- serializeEvent(dest);
+ serializeEvent(dest, version);
}
- public int serializedSize()
+ public int serializedSize(int version)
{
- return CBUtil.sizeOfEnumValue(type) + eventSerializedSize();
+ return CBUtil.sizeOfEnumValue(type) + eventSerializedSize(version);
}
- protected abstract void serializeEvent(ByteBuf dest);
- protected abstract int eventSerializedSize();
+ protected abstract void serializeEvent(ByteBuf dest, int version);
+ protected abstract int eventSerializedSize(int version);
public static class TopologyChange extends Event
{
@@ -91,20 +92,20 @@ public abstract class Event
}
// Assumes the type has already been deserialized
- private static TopologyChange deserializeEvent(ByteBuf cb)
+ private static TopologyChange deserializeEvent(ByteBuf cb, int version)
{
Change change = CBUtil.readEnumValue(Change.class, cb);
InetSocketAddress node = CBUtil.readInet(cb);
return new TopologyChange(change, node);
}
- protected void serializeEvent(ByteBuf dest)
+ protected void serializeEvent(ByteBuf dest, int version)
{
CBUtil.writeEnumValue(change, dest);
CBUtil.writeInet(node, dest);
}
- protected int eventSerializedSize()
+ protected int eventSerializedSize(int version)
{
return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfInet(node);
}
@@ -114,6 +115,23 @@ public abstract class Event
{
return change + " " + node;
}
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(change, node);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof TopologyChange))
+ return false;
+
+ TopologyChange tpc = (TopologyChange)other;
+ return Objects.equal(change, tpc.change)
+ && Objects.equal(node, tpc.node);
+ }
}
public static class StatusChange extends Event
@@ -141,20 +159,20 @@ public abstract class Event
}
// Assumes the type has already been deserialized
- private static StatusChange deserializeEvent(ByteBuf cb)
+ private static StatusChange deserializeEvent(ByteBuf cb, int version)
{
Status status = CBUtil.readEnumValue(Status.class, cb);
InetSocketAddress node = CBUtil.readInet(cb);
return new StatusChange(status, node);
}
- protected void serializeEvent(ByteBuf dest)
+ protected void serializeEvent(ByteBuf dest, int version)
{
CBUtil.writeEnumValue(status, dest);
CBUtil.writeInet(node, dest);
}
- protected int eventSerializedSize()
+ protected int eventSerializedSize(int version)
{
return CBUtil.sizeOfEnumValue(status) + CBUtil.sizeOfInet(node);
}
@@ -164,56 +182,130 @@ public abstract class Event
{
return status + " " + node;
}
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(status, node);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof StatusChange))
+ return false;
+
+ StatusChange stc = (StatusChange)other;
+ return Objects.equal(status, stc.status)
+ && Objects.equal(node, stc.node);
+ }
}
public static class SchemaChange extends Event
{
public enum Change { CREATED, UPDATED, DROPPED }
+ public enum Target { KEYSPACE, TABLE, TYPE }
public final Change change;
+ public final Target target;
public final String keyspace;
- public final String table;
+ public final String tableOrType;
- public SchemaChange(Change change, String keyspace, String table)
+ public SchemaChange(Change change, Target target, String keyspace, String tableOrType)
{
super(Type.SCHEMA_CHANGE);
this.change = change;
+ this.target = target;
this.keyspace = keyspace;
- this.table = table;
+ this.tableOrType = tableOrType;
}
public SchemaChange(Change change, String keyspace)
{
- this(change, keyspace, "");
+ this(change, Target.KEYSPACE, keyspace, null);
}
// Assumes the type has already been deserialized
- private static SchemaChange deserializeEvent(ByteBuf cb)
+ private static SchemaChange deserializeEvent(ByteBuf cb, int version)
{
Change change = CBUtil.readEnumValue(Change.class, cb);
- String keyspace = CBUtil.readString(cb);
- String table = CBUtil.readString(cb);
- return new SchemaChange(change, keyspace, table);
+ if (version >= 3)
+ {
+ Target target = CBUtil.readEnumValue(Target.class, cb);
+ String keyspace = CBUtil.readString(cb);
+ String tableOrType = target == Target.KEYSPACE ? null : CBUtil.readString(cb);
+ return new SchemaChange(change, target, keyspace, tableOrType);
+ }
+ else
+ {
+ String keyspace = CBUtil.readString(cb);
+ String table = CBUtil.readString(cb);
+ return new SchemaChange(change, table.isEmpty() ? Target.KEYSPACE : Target.TABLE, keyspace, table.isEmpty() ? null : table);
+ }
}
- protected void serializeEvent(ByteBuf dest)
+ protected void serializeEvent(ByteBuf dest, int version)
{
- CBUtil.writeEnumValue(change, dest);
- CBUtil.writeString(keyspace, dest);
- CBUtil.writeString(table, dest);
+ if (version >= 3)
+ {
+ CBUtil.writeEnumValue(change, dest);
+ CBUtil.writeEnumValue(target, dest);
+ CBUtil.writeString(keyspace, dest);
+ if (target != Target.KEYSPACE)
+ CBUtil.writeString(tableOrType, dest);
+ }
+ else
+ {
+ CBUtil.writeEnumValue(change, dest);
+ CBUtil.writeString(keyspace, dest);
+ CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrType, dest);
+ }
}
- protected int eventSerializedSize()
+ protected int eventSerializedSize(int version)
{
- return CBUtil.sizeOfEnumValue(change)
- + CBUtil.sizeOfString(keyspace)
- + CBUtil.sizeOfString(table);
+ if (version >= 3)
+ {
+ int size = CBUtil.sizeOfEnumValue(change)
+ + CBUtil.sizeOfEnumValue(target)
+ + CBUtil.sizeOfString(keyspace);
+
+ if (target != Target.KEYSPACE)
+ size += CBUtil.sizeOfString(tableOrType);
+
+ return size;
+ }
+ else
+ {
+ return CBUtil.sizeOfEnumValue(change)
+ + CBUtil.sizeOfString(keyspace)
+ + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrType);
+ }
}
@Override
public String toString()
{
- return change + " " + keyspace + (table.isEmpty() ? "" : "." + table);
+ return change + " " + target + " " + keyspace + (tableOrType == null ? "" : "." + tableOrType);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(change, target, keyspace, tableOrType);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof SchemaChange))
+ return false;
+
+ SchemaChange scc = (SchemaChange)other;
+ return Objects.equal(change, scc.change)
+ && Objects.equal(target, scc.target)
+ && Objects.equal(keyspace, scc.keyspace)
+ && Objects.equal(tableOrType, scc.tableOrType);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java
index 9b82bda..ec2a1fa 100644
--- a/src/java/org/apache/cassandra/transport/OptionCodec.java
+++ b/src/java/org/apache/cassandra/transport/OptionCodec.java
@@ -32,9 +32,9 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
{
public int getId();
- public Object readValue(ByteBuf cb);
- public void writeValue(Object value, ByteBuf cb);
- public int serializedValueSize(Object obj);
+ public Object readValue(ByteBuf cb, int version);
+ public void writeValue(Object value, ByteBuf cb, int version);
+ public int serializedValueSize(Object obj, int version);
}
private final Class<T> klass;
@@ -66,14 +66,14 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
return opt;
}
- public Map<T, Object> decode(ByteBuf body)
+ public Map<T, Object> decode(ByteBuf body, int version)
{
EnumMap<T, Object> options = new EnumMap<T, Object>(klass);
int n = body.readUnsignedShort();
for (int i = 0; i < n; i++)
{
T opt = fromId(body.readUnsignedShort());
- Object value = opt.readValue(body);
+ Object value = opt.readValue(body, version);
if (options.containsKey(opt))
throw new ProtocolException(String.format("Duplicate option %s in message", opt.name()));
options.put(opt, value);
@@ -81,41 +81,41 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
return options;
}
- public ByteBuf encode(Map<T, Object> options)
+ public ByteBuf encode(Map<T, Object> options, int version)
{
int optLength = 2;
for (Map.Entry<T, Object> entry : options.entrySet())
- optLength += 2 + entry.getKey().serializedValueSize(entry.getValue());
+ optLength += 2 + entry.getKey().serializedValueSize(entry.getValue(), version);
ByteBuf cb = Unpooled.buffer(optLength);
cb.writeShort(options.size());
for (Map.Entry<T, Object> entry : options.entrySet())
{
T opt = entry.getKey();
cb.writeShort(opt.getId());
- opt.writeValue(entry.getValue(), cb);
+ opt.writeValue(entry.getValue(), cb, version);
}
return cb;
}
- public Pair<T, Object> decodeOne(ByteBuf body)
+ public Pair<T, Object> decodeOne(ByteBuf body, int version)
{
T opt = fromId(body.readUnsignedShort());
- Object value = opt.readValue(body);
+ Object value = opt.readValue(body, version);
return Pair.create(opt, value);
}
- public void writeOne(Pair<T, Object> option, ByteBuf dest)
+ public void writeOne(Pair<T, Object> option, ByteBuf dest, int version)
{
T opt = option.left;
Object obj = option.right;
dest.writeShort(opt.getId());
- opt.writeValue(obj, dest);
+ opt.writeValue(obj, dest, version);
}
- public int oneSerializedSize(Pair<T, Object> option)
+ public int oneSerializedSize(Pair<T, Object> option, int version)
{
T opt = option.left;
Object obj = option.right;
- return 2 + opt.serializedValueSize(obj);
+ return 2 + opt.serializedValueSize(obj, version);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 8d08ffd..eb2b043 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -378,7 +378,12 @@ public class Server implements CassandraDaemon.Server
public void onCreateColumnFamily(String ksName, String cfName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName, cfName));
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+ }
+
+ public void onCreateUserType(String ksName, String typeName)
+ {
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
public void onUpdateKeyspace(String ksName)
@@ -388,7 +393,12 @@ public class Server implements CassandraDaemon.Server
public void onUpdateColumnFamily(String ksName, String cfName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName, cfName));
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+ }
+
+ public void onUpdateUserType(String ksName, String typeName)
+ {
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
public void onDropKeyspace(String ksName)
@@ -398,7 +408,12 @@ public class Server implements CassandraDaemon.Server
public void onDropColumnFamily(String ksName, String cfName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName, cfName));
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+ }
+
+ public void onDropUserType(String ksName, String typeName)
+ {
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ef56882..3cf9b7b 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -157,7 +157,7 @@ public class SimpleClient
public ResultMessage execute(String query, List<ByteBuffer> values, ConsistencyLevel consistencyLevel)
{
- Message.Response msg = execute(new QueryMessage(query, new QueryOptions(consistencyLevel, values)));
+ Message.Response msg = execute(new QueryMessage(query, QueryOptions.forInternalCalls(consistencyLevel, values)));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}
@@ -171,7 +171,7 @@ public class SimpleClient
public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
{
- Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), new QueryOptions(consistency, values)));
+ Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), QueryOptions.forInternalCalls(consistency, values)));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}